Spaces:
Paused
Paused
| /** | |
| * Fast Neo4j Sync - Parallel batch sync with incremental support | |
| * | |
| * Much faster than sequential sync by: | |
| * 1. Using larger batches (500 instead of 100) | |
| * 2. Running multiple batches in parallel | |
| * 3. Using UNWIND for bulk operations | |
| * 4. Supporting incremental sync based on timestamps | |
| */ | |
| import neo4j, { Driver, Session } from 'neo4j-driver'; | |
| import { config } from 'dotenv'; | |
| import { resolve } from 'path'; | |
| // Load .env.production if it exists, otherwise fall back to .env | |
| config({ path: resolve(process.cwd(), '.env.production') }); | |
| config({ path: resolve(process.cwd(), '.env') }); | |
| interface SyncConfig { | |
| localUri: string; | |
| localUser: string; | |
| localPassword: string; | |
| cloudUri: string; | |
| cloudUser: string; | |
| cloudPassword: string; | |
| batchSize: number; | |
| parallelBatches: number; | |
| incrementalOnly: boolean; | |
| lastSyncTime?: Date; | |
| } | |
| interface SyncStats { | |
| nodesCreated: number; | |
| nodesUpdated: number; | |
| relationshipsCreated: number; | |
| relationshipsUpdated: number; | |
| duration: number; | |
| } | |
| class FastNeo4jSync { | |
| private localDriver: Driver; | |
| private cloudDriver: Driver; | |
| private config: SyncConfig; | |
| private stats: SyncStats = { | |
| nodesCreated: 0, | |
| nodesUpdated: 0, | |
| relationshipsCreated: 0, | |
| relationshipsUpdated: 0, | |
| duration: 0 | |
| }; | |
| constructor(config: SyncConfig) { | |
| this.config = config; | |
| this.localDriver = neo4j.driver( | |
| config.localUri, | |
| neo4j.auth.basic(config.localUser, config.localPassword) | |
| ); | |
| this.cloudDriver = neo4j.driver( | |
| config.cloudUri, | |
| neo4j.auth.basic(config.cloudUser, config.cloudPassword) | |
| ); | |
| } | |
| async connect(): Promise<void> { | |
| console.log('📡 Connecting to databases...'); | |
| const localSession = this.localDriver.session(); | |
| const cloudSession = this.cloudDriver.session(); | |
| try { | |
| await localSession.run('RETURN 1'); | |
| console.log(' ✓ Local Neo4j connected'); | |
| await cloudSession.run('RETURN 1'); | |
| console.log(' ✓ Cloud AuraDB connected'); | |
| } finally { | |
| await localSession.close(); | |
| await cloudSession.close(); | |
| } | |
| } | |
| async getLastSyncTime(): Promise<Date | null> { | |
| const cloudSession = this.cloudDriver.session(); | |
| try { | |
| const result = await cloudSession.run(` | |
| MATCH (s:_SyncMetadata {id: 'last_sync'}) | |
| RETURN s.timestamp as timestamp | |
| `); | |
| if (result.records.length > 0) { | |
| const ts = result.records[0].get('timestamp'); | |
| return ts ? new Date(ts) : null; | |
| } | |
| return null; | |
| } catch { | |
| return null; | |
| } finally { | |
| await cloudSession.close(); | |
| } | |
| } | |
| async setLastSyncTime(timestamp: Date): Promise<void> { | |
| const cloudSession = this.cloudDriver.session(); | |
| try { | |
| await cloudSession.run(` | |
| MERGE (s:_SyncMetadata {id: 'last_sync'}) | |
| SET s.timestamp = $timestamp | |
| `, { timestamp: timestamp.toISOString() }); | |
| } finally { | |
| await cloudSession.close(); | |
| } | |
| } | |
| async countNodesToSync(since?: Date): Promise<number> { | |
| const localSession = this.localDriver.session(); | |
| try { | |
| let query: string; | |
| let params: Record<string, unknown> = {}; | |
| if (since) { | |
| query = ` | |
| MATCH (n) | |
| WHERE n.updatedAt > $since OR n.createdAt > $since OR NOT exists(n.updatedAt) | |
| RETURN count(n) as count | |
| `; | |
| params = { since: since.toISOString() }; | |
| } else { | |
| query = 'MATCH (n) RETURN count(n) as count'; | |
| } | |
| const result = await localSession.run(query, params); | |
| return result.records[0].get('count').toNumber(); | |
| } finally { | |
| await localSession.close(); | |
| } | |
| } | |
| async countRelationshipsToSync(since?: Date): Promise<number> { | |
| const localSession = this.localDriver.session(); | |
| try { | |
| let query: string; | |
| let params: Record<string, unknown> = {}; | |
| if (since) { | |
| // For relationships, we sync if either end node was updated | |
| query = ` | |
| MATCH (a)-[r]->(b) | |
| WHERE a.updatedAt > $since OR b.updatedAt > $since | |
| OR a.createdAt > $since OR b.createdAt > $since | |
| OR NOT exists(a.updatedAt) | |
| RETURN count(r) as count | |
| `; | |
| params = { since: since.toISOString() }; | |
| } else { | |
| query = 'MATCH ()-[r]->() RETURN count(r) as count'; | |
| } | |
| const result = await localSession.run(query, params); | |
| return result.records[0].get('count').toNumber(); | |
| } finally { | |
| await localSession.close(); | |
| } | |
| } | |
| async syncNodesInParallel(since?: Date): Promise<void> { | |
| const localSession = this.localDriver.session(); | |
| const totalNodes = await this.countNodesToSync(since); | |
| console.log(`\n📦 Syncing ${totalNodes} nodes (batch: ${this.config.batchSize}, parallel: ${this.config.parallelBatches})...`); | |
| if (totalNodes === 0) { | |
| console.log(' ✓ No nodes to sync'); | |
| return; | |
| } | |
| let offset = 0; | |
| let processed = 0; | |
| const startTime = Date.now(); | |
| try { | |
| while (offset < totalNodes) { | |
| // Fetch multiple batches worth of nodes | |
| const fetchSize = this.config.batchSize * this.config.parallelBatches; | |
| let query: string; | |
| let params: Record<string, unknown> = { skip: neo4j.int(offset), limit: neo4j.int(fetchSize) }; | |
| if (since) { | |
| query = ` | |
| MATCH (n) | |
| WHERE n.updatedAt > $since OR n.createdAt > $since OR NOT exists(n.updatedAt) | |
| RETURN n, labels(n) as labels, elementId(n) as elementId | |
| SKIP $skip LIMIT $limit | |
| `; | |
| params.since = since.toISOString(); | |
| } else { | |
| query = ` | |
| MATCH (n) | |
| RETURN n, labels(n) as labels, elementId(n) as elementId | |
| SKIP $skip LIMIT $limit | |
| `; | |
| } | |
| const result = await localSession.run(query, params); | |
| const nodes = result.records.map(r => ({ | |
| properties: r.get('n').properties, | |
| labels: r.get('labels') as string[], | |
| elementId: r.get('elementId') as string | |
| })); | |
| if (nodes.length === 0) break; | |
| // Split into batches and process in parallel | |
| const batches: typeof nodes[] = []; | |
| for (let i = 0; i < nodes.length; i += this.config.batchSize) { | |
| batches.push(nodes.slice(i, i + this.config.batchSize)); | |
| } | |
| // Process batches in parallel | |
| await Promise.all(batches.map(batch => this.importNodeBatch(batch))); | |
| processed += nodes.length; | |
| offset += fetchSize; | |
| const elapsed = (Date.now() - startTime) / 1000; | |
| const rate = Math.round(processed / elapsed); | |
| const eta = Math.round((totalNodes - processed) / rate); | |
| console.log(` Progress: ${processed}/${totalNodes} (${rate} nodes/sec, ETA: ${eta}s)`); | |
| } | |
| this.stats.nodesCreated = processed; | |
| console.log(` ✓ Synced ${processed} nodes in ${((Date.now() - startTime) / 1000).toFixed(1)}s`); | |
| } finally { | |
| await localSession.close(); | |
| } | |
| } | |
| private async importNodeBatch(nodes: Array<{ properties: Record<string, unknown>; labels: string[]; elementId: string }>): Promise<void> { | |
| const cloudSession = this.cloudDriver.session(); | |
| try { | |
| // Group nodes by label combination for efficient UNWIND | |
| const nodesByLabels = new Map<string, typeof nodes>(); | |
| for (const node of nodes) { | |
| const labelKey = node.labels.sort().join(':'); | |
| if (!nodesByLabels.has(labelKey)) { | |
| nodesByLabels.set(labelKey, []); | |
| } | |
| nodesByLabels.get(labelKey)!.push(node); | |
| } | |
| // Import each label group | |
| for (const [labelKey, labelNodes] of nodesByLabels) { | |
| const labels = labelKey || 'Node'; | |
| const nodeData = labelNodes.map(n => ({ | |
| ...n.properties, | |
| _syncId: n.elementId | |
| })); | |
| // Use MERGE to handle both create and update | |
| await cloudSession.run(` | |
| UNWIND $nodes as nodeData | |
| MERGE (n:${labels} {_syncId: nodeData._syncId}) | |
| SET n = nodeData | |
| `, { nodes: nodeData }); | |
| } | |
| } finally { | |
| await cloudSession.close(); | |
| } | |
| } | |
| async syncRelationshipsInParallel(since?: Date): Promise<void> { | |
| const localSession = this.localDriver.session(); | |
| const totalRels = await this.countRelationshipsToSync(since); | |
| console.log(`\n🔗 Syncing ${totalRels} relationships...`); | |
| if (totalRels === 0) { | |
| console.log(' ✓ No relationships to sync'); | |
| return; | |
| } | |
| let offset = 0; | |
| let processed = 0; | |
| const startTime = Date.now(); | |
| try { | |
| while (offset < totalRels) { | |
| const fetchSize = this.config.batchSize * this.config.parallelBatches; | |
| let query: string; | |
| let params: Record<string, unknown> = { skip: neo4j.int(offset), limit: neo4j.int(fetchSize) }; | |
| if (since) { | |
| query = ` | |
| MATCH (a)-[r]->(b) | |
| WHERE a.updatedAt > $since OR b.updatedAt > $since | |
| OR a.createdAt > $since OR b.createdAt > $since | |
| OR NOT exists(a.updatedAt) | |
| RETURN type(r) as type, properties(r) as props, | |
| elementId(a) as startId, elementId(b) as endId | |
| SKIP $skip LIMIT $limit | |
| `; | |
| params.since = since.toISOString(); | |
| } else { | |
| query = ` | |
| MATCH (a)-[r]->(b) | |
| RETURN type(r) as type, properties(r) as props, | |
| elementId(a) as startId, elementId(b) as endId | |
| SKIP $skip LIMIT $limit | |
| `; | |
| } | |
| const result = await localSession.run(query, params); | |
| const rels = result.records.map(r => ({ | |
| type: r.get('type') as string, | |
| properties: r.get('props') as Record<string, unknown>, | |
| startId: r.get('startId') as string, | |
| endId: r.get('endId') as string | |
| })); | |
| if (rels.length === 0) break; | |
| // Split into batches and process in parallel | |
| const batches: typeof rels[] = []; | |
| for (let i = 0; i < rels.length; i += this.config.batchSize) { | |
| batches.push(rels.slice(i, i + this.config.batchSize)); | |
| } | |
| // Process batches sequentially to avoid deadlocks | |
| for (const batch of batches) { | |
| await this.importRelationshipBatch(batch); | |
| } | |
| processed += rels.length; | |
| offset += fetchSize; | |
| const elapsed = (Date.now() - startTime) / 1000; | |
| const rate = Math.round(processed / elapsed); | |
| const eta = Math.round((totalRels - processed) / rate); | |
| console.log(` Progress: ${processed}/${totalRels} (${rate} rels/sec, ETA: ${eta}s)`); | |
| } | |
| this.stats.relationshipsCreated = processed; | |
| console.log(` ✓ Synced ${processed} relationships in ${((Date.now() - startTime) / 1000).toFixed(1)}s`); | |
| } finally { | |
| await localSession.close(); | |
| } | |
| } | |
| private async importRelationshipBatch(rels: Array<{ type: string; properties: Record<string, unknown>; startId: string; endId: string }>): Promise<void> { | |
| const cloudSession = this.cloudDriver.session(); | |
| try { | |
| // Group by relationship type | |
| const relsByType = new Map<string, typeof rels>(); | |
| for (const rel of rels) { | |
| if (!relsByType.has(rel.type)) { | |
| relsByType.set(rel.type, []); | |
| } | |
| relsByType.get(rel.type)!.push(rel); | |
| } | |
| // Import each type | |
| for (const [relType, typeRels] of relsByType) { | |
| const relData = typeRels.map(r => ({ | |
| startId: r.startId, | |
| endId: r.endId, | |
| props: r.properties | |
| })); | |
| await cloudSession.run(` | |
| UNWIND $rels as relData | |
| MATCH (a {_syncId: relData.startId}) | |
| MATCH (b {_syncId: relData.endId}) | |
| MERGE (a)-[r:${relType}]->(b) | |
| SET r = relData.props | |
| `, { rels: relData }); | |
| } | |
| } finally { | |
| await cloudSession.close(); | |
| } | |
| } | |
| async syncSchema(): Promise<void> { | |
| console.log('\n📋 Syncing schema...'); | |
| const localSession = this.localDriver.session(); | |
| const cloudSession = this.cloudDriver.session(); | |
| try { | |
| // Get constraints from local - build CREATE statements from available fields | |
| const constraintsResult = await localSession.run('SHOW CONSTRAINTS'); | |
| for (const record of constraintsResult.records) { | |
| const name = record.get('name'); | |
| const constraintType = record.get('type'); | |
| const entityType = record.get('entityType'); | |
| const labelsOrTypes = record.get('labelsOrTypes') as string[]; | |
| const properties = record.get('properties') as string[]; | |
| if (!name || name.startsWith('_')) continue; | |
| // Build CREATE CONSTRAINT statement | |
| let createStatement = ''; | |
| const label = labelsOrTypes?.[0] || 'Node'; | |
| const prop = properties?.[0] || 'id'; | |
| if (constraintType === 'UNIQUENESS' && entityType === 'NODE') { | |
| createStatement = `CREATE CONSTRAINT ${name} IF NOT EXISTS FOR (n:${label}) REQUIRE n.${prop} IS UNIQUE`; | |
| } else if (constraintType === 'NODE_KEY') { | |
| createStatement = `CREATE CONSTRAINT ${name} IF NOT EXISTS FOR (n:${label}) REQUIRE n.${prop} IS NODE KEY`; | |
| } | |
| if (createStatement) { | |
| try { | |
| await cloudSession.run(createStatement); | |
| console.log(` ✓ Constraint: ${name}`); | |
| } catch (e: unknown) { | |
| const error = e as Error; | |
| if (!error.message?.includes('already exists') && !error.message?.includes('equivalent')) { | |
| console.log(` ⚠ Constraint ${name}: ${error.message?.slice(0, 50)}`); | |
| } | |
| } | |
| } | |
| } | |
| // Get indexes - exclude LOOKUP and internal indexes | |
| const indexesResult = await localSession.run('SHOW INDEXES'); | |
| for (const record of indexesResult.records) { | |
| const name = record.get('name'); | |
| const indexType = record.get('type'); | |
| const entityType = record.get('entityType'); | |
| const labelsOrTypes = record.get('labelsOrTypes') as string[]; | |
| const properties = record.get('properties') as string[]; | |
| if (!name || name.startsWith('_') || indexType === 'LOOKUP') continue; | |
| const label = labelsOrTypes?.[0] || 'Node'; | |
| const props = properties || []; | |
| let createStatement = ''; | |
| if (indexType === 'RANGE' && entityType === 'NODE' && props.length > 0) { | |
| const propList = props.map(p => `n.${p}`).join(', '); | |
| createStatement = `CREATE INDEX ${name} IF NOT EXISTS FOR (n:${label}) ON (${propList})`; | |
| } else if (indexType === 'VECTOR' && props.length > 0) { | |
| // Skip vector indexes - they need special handling | |
| console.log(` ⏭ Skipping vector index: ${name}`); | |
| continue; | |
| } | |
| if (createStatement) { | |
| try { | |
| await cloudSession.run(createStatement); | |
| console.log(` ✓ Index: ${name} (${indexType})`); | |
| } catch (e: unknown) { | |
| const error = e as Error; | |
| if (!error.message?.includes('already exists') && !error.message?.includes('equivalent')) { | |
| console.log(` ⚠ Index ${name}: ${error.message?.slice(0, 50)}`); | |
| } | |
| } | |
| } | |
| } | |
| // Ensure _syncId index exists for fast lookups | |
| try { | |
| await cloudSession.run('CREATE INDEX sync_id_index IF NOT EXISTS FOR (n:Node) ON (n._syncId)'); | |
| } catch { | |
| // Ignore | |
| } | |
| } finally { | |
| await localSession.close(); | |
| await cloudSession.close(); | |
| } | |
| } | |
| async run(): Promise<SyncStats> { | |
| const startTime = Date.now(); | |
| console.log('═'.repeat(60)); | |
| console.log('⚡ Fast Neo4j Sync'); | |
| console.log('═'.repeat(60)); | |
| console.log(`\nLocal: ${this.config.localUri}`); | |
| console.log(`Cloud: ${this.config.cloudUri}`); | |
| console.log(`Mode: ${this.config.incrementalOnly ? 'Incremental' : 'Full'}`); | |
| console.log(`Batch: ${this.config.batchSize} nodes × ${this.config.parallelBatches} parallel`); | |
| await this.connect(); | |
| let since: Date | undefined; | |
| if (this.config.incrementalOnly) { | |
| const lastSync = await this.getLastSyncTime(); | |
| if (lastSync) { | |
| since = lastSync; | |
| console.log(`\n📅 Last sync: ${lastSync.toISOString()}`); | |
| console.log(' Only syncing changes since then...'); | |
| } else { | |
| console.log('\n⚠️ No previous sync found, doing full sync...'); | |
| } | |
| } | |
| await this.syncSchema(); | |
| await this.syncNodesInParallel(since); | |
| await this.syncRelationshipsInParallel(since); | |
| // Record sync time | |
| await this.setLastSyncTime(new Date()); | |
| this.stats.duration = Date.now() - startTime; | |
| console.log('\n' + '═'.repeat(60)); | |
| console.log('✅ Sync Complete!'); | |
| console.log('═'.repeat(60)); | |
| console.log(` Nodes: ${this.stats.nodesCreated}`); | |
| console.log(` Relationships: ${this.stats.relationshipsCreated}`); | |
| console.log(` Duration: ${(this.stats.duration / 1000).toFixed(1)}s`); | |
| console.log(` Rate: ${Math.round((this.stats.nodesCreated + this.stats.relationshipsCreated) / (this.stats.duration / 1000))} items/sec`); | |
| return this.stats; | |
| } | |
| async close(): Promise<void> { | |
| await this.localDriver.close(); | |
| await this.cloudDriver.close(); | |
| } | |
| } | |
| // CLI | |
| async function main() { | |
| const args = process.argv.slice(2); | |
| const incremental = args.includes('--incremental') || args.includes('-i'); | |
| const help = args.includes('--help') || args.includes('-h'); | |
| if (help) { | |
| console.log(` | |
| Fast Neo4j Sync - Parallel batch sync with incremental support | |
| Usage: npx tsx src/scripts/fast-neo4j-sync.ts [options] | |
| Options: | |
| --incremental, -i Only sync changes since last sync | |
| --help, -h Show this help | |
| Environment variables: | |
| NEO4J_LOCAL_URI Local Neo4j URI (default: bolt://localhost:7687) | |
| NEO4J_LOCAL_USER Local Neo4j user (default: neo4j) | |
| NEO4J_LOCAL_PASSWORD Local Neo4j password | |
| NEO4J_CLOUD_URI Cloud AuraDB URI | |
| NEO4J_CLOUD_USER Cloud user (default: neo4j) | |
| NEO4J_CLOUD_PASSWORD Cloud password | |
| `); | |
| process.exit(0); | |
| } | |
| // Cloud URI from .env.production (NEO4J_URI contains the cloud AuraDB) | |
| const cloudUri = process.env.NEO4J_URI || ''; | |
| const cloudPassword = process.env.NEO4J_PASSWORD || ''; | |
| const syncConfig: SyncConfig = { | |
| localUri: process.env.NEO4J_LOCAL_URI || 'bolt://localhost:7687', | |
| localUser: process.env.NEO4J_LOCAL_USER || 'neo4j', | |
| localPassword: process.env.NEO4J_LOCAL_PASSWORD || 'password', // Local default | |
| cloudUri: cloudUri, | |
| cloudUser: process.env.NEO4J_USER || 'neo4j', | |
| cloudPassword: cloudPassword, | |
| batchSize: 500, // Larger batches | |
| parallelBatches: 4, // 4 parallel operations | |
| incrementalOnly: incremental | |
| }; | |
| if (!syncConfig.cloudUri) { | |
| console.error('❌ NEO4J_URI (cloud) is required in .env.production'); | |
| process.exit(1); | |
| } | |
| const sync = new FastNeo4jSync(syncConfig); | |
| try { | |
| await sync.run(); | |
| } finally { | |
| await sync.close(); | |
| } | |
| } | |
| main().catch(console.error); | |