/** * Neo4j Auto-Sync Service * * Automated synchronization between local Neo4j and AuraDB cloud. * Supports full sync, incremental sync, and scheduled execution. */ import neo4j, { Driver, Session } from 'neo4j-driver'; import cron from 'node-cron'; import fs from 'fs'; import path from 'path'; export interface SyncConfig { local: { uri: string; user: string; password: string; database: string; }; cloud: { uri: string; user: string; password: string; database: string; }; schedule?: string; // Cron expression incrementalEnabled?: boolean; batchSize?: number; } export interface SyncStatus { lastSyncTime: string | null; lastSyncType: 'full' | 'incremental' | null; lastSyncDuration: number | null; nodesSync: number; relationshipsSync: number; status: 'idle' | 'running' | 'success' | 'error'; error?: string; nextScheduledSync?: string; } export interface SyncCheckpoint { timestamp: string; nodeCount: number; relationshipCount: number; lastNodeIds: string[]; } const CHECKPOINT_FILE = path.join(process.cwd(), '.neo4j-sync-checkpoint.json'); const STATUS_FILE = path.join(process.cwd(), '.neo4j-sync-status.json'); export class Neo4jAutoSync { private config: SyncConfig; private localDriver: Driver | null = null; private cloudDriver: Driver | null = null; private cronJob: cron.ScheduledTask | null = null; private status: SyncStatus = { lastSyncTime: null, lastSyncType: null, lastSyncDuration: null, nodesSync: 0, relationshipsSync: 0, status: 'idle' }; constructor(config?: Partial) { this.config = { local: { uri: config?.local?.uri || process.env.NEO4J_LOCAL_URI || 'bolt://localhost:7687', user: config?.local?.user || process.env.NEO4J_LOCAL_USER || 'neo4j', password: config?.local?.password || process.env.NEO4J_LOCAL_PASSWORD || 'password', database: config?.local?.database || process.env.NEO4J_LOCAL_DATABASE || 'neo4j' }, cloud: { uri: config?.cloud?.uri || process.env.NEO4J_URI || '', user: config?.cloud?.user || process.env.NEO4J_USER || 'neo4j', password: config?.cloud?.password || process.env.NEO4J_PASSWORD || '', database: config?.cloud?.database || process.env.NEO4J_DATABASE || 'neo4j' }, schedule: config?.schedule || process.env.NEO4J_SYNC_SCHEDULE || '0 */6 * * *', // Every 6 hours incrementalEnabled: config?.incrementalEnabled ?? true, batchSize: config?.batchSize || 500 }; this.loadStatus(); } private loadStatus(): void { try { if (fs.existsSync(STATUS_FILE)) { const data = fs.readFileSync(STATUS_FILE, 'utf-8'); this.status = JSON.parse(data); this.status.status = 'idle'; // Reset running status on restart } } catch { // Ignore errors, use default status } } private saveStatus(): void { try { fs.writeFileSync(STATUS_FILE, JSON.stringify(this.status, null, 2)); } catch (error) { console.error('Failed to save sync status:', error); } } private loadCheckpoint(): SyncCheckpoint | null { try { if (fs.existsSync(CHECKPOINT_FILE)) { const data = fs.readFileSync(CHECKPOINT_FILE, 'utf-8'); return JSON.parse(data); } } catch { // Ignore errors } return null; } private saveCheckpoint(checkpoint: SyncCheckpoint): void { try { fs.writeFileSync(CHECKPOINT_FILE, JSON.stringify(checkpoint, null, 2)); } catch (error) { console.error('Failed to save checkpoint:', error); } } async connect(): Promise { console.log('šŸ“” Connecting to Neo4j instances...'); this.localDriver = neo4j.driver( this.config.local.uri, neo4j.auth.basic(this.config.local.user, this.config.local.password), { maxConnectionLifetime: 3 * 60 * 60 * 1000 } ); await this.localDriver.verifyConnectivity(); console.log(' āœ“ Connected to local Neo4j'); this.cloudDriver = neo4j.driver( this.config.cloud.uri, neo4j.auth.basic(this.config.cloud.user, this.config.cloud.password), { maxConnectionLifetime: 3 * 60 * 60 * 1000 } ); await this.cloudDriver.verifyConnectivity(); console.log(' āœ“ Connected to AuraDB cloud'); } async disconnect(): Promise { if (this.localDriver) await this.localDriver.close(); if (this.cloudDriver) await this.cloudDriver.close(); this.localDriver = null; this.cloudDriver = null; } /** * Start scheduled auto-sync */ startScheduler(): void { if (this.cronJob) { this.cronJob.stop(); } console.log(`ā° Starting Neo4j auto-sync scheduler: ${this.config.schedule}`); this.cronJob = cron.schedule(this.config.schedule!, async () => { console.log(`\nšŸ”„ [${new Date().toISOString()}] Scheduled sync starting...`); try { await this.sync(); } catch (error) { console.error('Scheduled sync failed:', error); } }); // Calculate next run time const nextRun = this.getNextScheduledTime(); this.status.nextScheduledSync = nextRun; this.saveStatus(); console.log(` Next sync: ${nextRun}`); } stopScheduler(): void { if (this.cronJob) { this.cronJob.stop(); this.cronJob = null; console.log('ā¹ļø Scheduler stopped'); } } private getNextScheduledTime(): string { // Simple calculation for common cron patterns const now = new Date(); const schedule = this.config.schedule!; if (schedule.includes('*/6')) { // Every 6 hours const nextHour = Math.ceil(now.getHours() / 6) * 6; const next = new Date(now); next.setHours(nextHour, 0, 0, 0); if (next <= now) next.setHours(next.getHours() + 6); return next.toISOString(); } return 'See cron expression: ' + schedule; } /** * Perform sync (auto-detects full vs incremental) */ async sync(forceFullSync: boolean = false): Promise { const startTime = Date.now(); this.status.status = 'running'; this.saveStatus(); try { await this.connect(); const checkpoint = this.loadCheckpoint(); const shouldDoFullSync = forceFullSync || !checkpoint || !this.config.incrementalEnabled; if (shouldDoFullSync) { await this.fullSync(); this.status.lastSyncType = 'full'; } else { const hasChanges = await this.checkForChanges(checkpoint); if (hasChanges) { await this.incrementalSync(checkpoint); this.status.lastSyncType = 'incremental'; } else { console.log('āœ“ No changes detected, skipping sync'); this.status.lastSyncType = 'incremental'; } } this.status.status = 'success'; this.status.lastSyncTime = new Date().toISOString(); this.status.lastSyncDuration = Date.now() - startTime; delete this.status.error; } catch (error: any) { this.status.status = 'error'; this.status.error = error.message; console.error('āŒ Sync failed:', error.message); } finally { await this.disconnect(); this.saveStatus(); } return this.status; } private async checkForChanges(checkpoint: SyncCheckpoint): Promise { const session = this.localDriver!.session({ database: this.config.local.database }); try { // Check node count const nodeResult = await session.run('MATCH (n) RETURN count(n) as count'); const currentNodeCount = nodeResult.records[0].get('count').toNumber(); // Check relationship count const relResult = await session.run('MATCH ()-[r]->() RETURN count(r) as count'); const currentRelCount = relResult.records[0].get('count').toNumber(); const hasChanges = currentNodeCount !== checkpoint.nodeCount || currentRelCount !== checkpoint.relationshipCount; if (hasChanges) { console.log(`šŸ“Š Changes detected: nodes ${checkpoint.nodeCount} → ${currentNodeCount}, rels ${checkpoint.relationshipCount} → ${currentRelCount}`); } return hasChanges; } finally { await session.close(); } } private async fullSync(): Promise { console.log('\nšŸ”„ Starting FULL sync...'); const localSession = this.localDriver!.session({ database: this.config.local.database }); const cloudSession = this.cloudDriver!.session({ database: this.config.cloud.database }); try { // Export schema const schema = await this.exportSchema(localSession); // Export nodes const nodes = await this.exportNodes(localSession); // Export relationships const relationships = await this.exportRelationships(localSession); // Clear cloud await this.clearCloud(cloudSession); // Apply schema await this.applySchema(cloudSession, schema); // Import nodes const elementIdMap = new Map(); await this.importNodes(cloudSession, nodes, elementIdMap); // Import relationships await this.importRelationships(cloudSession, relationships, elementIdMap, nodes); // Save checkpoint this.saveCheckpoint({ timestamp: new Date().toISOString(), nodeCount: nodes.length, relationshipCount: relationships.length, lastNodeIds: nodes.slice(-100).map(n => n.properties.id) }); this.status.nodesSync = nodes.length; this.status.relationshipsSync = relationships.length; console.log(`\nāœ… Full sync complete: ${nodes.length} nodes, ${relationships.length} relationships`); } finally { await localSession.close(); await cloudSession.close(); } } private async incrementalSync(checkpoint: SyncCheckpoint): Promise { console.log('\nšŸ”„ Starting INCREMENTAL sync...'); const localSession = this.localDriver!.session({ database: this.config.local.database }); const cloudSession = this.cloudDriver!.session({ database: this.config.cloud.database }); try { // Find new/modified nodes since checkpoint const checkpointTime = new Date(checkpoint.timestamp); // Get nodes created/modified after checkpoint // This requires nodes to have a 'updatedAt' or 'createdAt' property const newNodesResult = await localSession.run(` MATCH (n) WHERE n.createdAt > datetime($checkpointTime) OR n.updatedAt > datetime($checkpointTime) OR NOT n.id IN $existingIds RETURN n, elementId(n) as elementId LIMIT 10000 `, { checkpointTime: checkpointTime.toISOString(), existingIds: checkpoint.lastNodeIds }); const newNodes: any[] = []; for (const record of newNodesResult.records) { const node = record.get('n'); newNodes.push({ labels: [...node.labels], properties: this.convertProperties(node.properties), elementId: record.get('elementId') }); } if (newNodes.length === 0) { console.log(' No new nodes to sync'); return; } console.log(` Found ${newNodes.length} new/modified nodes`); // Import new nodes const elementIdMap = new Map(); await this.importNodes(cloudSession, newNodes, elementIdMap); // Get new relationships for these nodes const nodeIds = newNodes.map(n => n.properties.id).filter(Boolean); if (nodeIds.length > 0) { const newRelsResult = await localSession.run(` MATCH (a)-[r]->(b) WHERE a.id IN $nodeIds OR b.id IN $nodeIds RETURN r, type(r) as type, elementId(a) as startId, elementId(b) as endId `, { nodeIds }); const newRels: any[] = []; for (const record of newRelsResult.records) { const rel = record.get('r'); newRels.push({ type: record.get('type'), properties: this.convertProperties(rel.properties), startElementId: record.get('startId'), endElementId: record.get('endId') }); } if (newRels.length > 0) { await this.importRelationships(cloudSession, newRels, elementIdMap, newNodes); } } // Update checkpoint const nodeCountResult = await localSession.run('MATCH (n) RETURN count(n) as count'); const relCountResult = await localSession.run('MATCH ()-[r]->() RETURN count(r) as count'); this.saveCheckpoint({ timestamp: new Date().toISOString(), nodeCount: nodeCountResult.records[0].get('count').toNumber(), relationshipCount: relCountResult.records[0].get('count').toNumber(), lastNodeIds: [...checkpoint.lastNodeIds, ...nodeIds].slice(-100) }); this.status.nodesSync = newNodes.length; this.status.relationshipsSync = 0; console.log(`āœ… Incremental sync complete: ${newNodes.length} nodes`); } finally { await localSession.close(); await cloudSession.close(); } } private convertProperties(props: Record): Record { const result: Record = {}; for (const [key, value] of Object.entries(props)) { if (value === null || value === undefined) { result[key] = value; } else if (typeof value === 'object' && value.constructor?.name === 'Integer') { result[key] = value.toNumber(); } else if (typeof value === 'object' && (value.constructor?.name === 'DateTime' || value.constructor?.name === 'Date')) { result[key] = value.toString(); } else if (Array.isArray(value)) { result[key] = value.map(v => this.convertProperties({ v }).v); } else { result[key] = value; } } return result; } private async exportSchema(session: Session): Promise { const schema: any[] = []; const constraintResult = await session.run('SHOW CONSTRAINTS'); for (const record of constraintResult.records) { const name = record.get('name'); const type = record.get('type'); const entityType = record.get('entityType'); const labelsOrTypes = record.get('labelsOrTypes'); const properties = record.get('properties'); if (type === 'UNIQUENESS' && entityType === 'NODE') { const label = labelsOrTypes?.[0]; const prop = properties?.[0]; if (label && prop) { schema.push({ type: 'constraint', name, definition: `CREATE CONSTRAINT ${name} IF NOT EXISTS FOR (n:${label}) REQUIRE n.${prop} IS UNIQUE` }); } } } const indexResult = await session.run('SHOW INDEXES'); for (const record of indexResult.records) { const name = record.get('name'); const type = record.get('type'); const entityType = record.get('entityType'); const labelsOrTypes = record.get('labelsOrTypes'); const properties = record.get('properties'); const owningConstraint = record.get('owningConstraint'); if (owningConstraint) continue; if (type === 'RANGE' && entityType === 'NODE') { const label = labelsOrTypes?.[0]; const prop = properties?.[0]; if (label && prop) { schema.push({ type: 'index', name, definition: `CREATE INDEX ${name} IF NOT EXISTS FOR (n:${label}) ON (n.${prop})` }); } } } return schema; } private async exportNodes(session: Session): Promise { console.log('šŸ“¦ Exporting nodes...'); const result = await session.run('MATCH (n) RETURN n, elementId(n) as elementId'); const nodes = result.records.map(record => ({ labels: [...record.get('n').labels], properties: this.convertProperties(record.get('n').properties), elementId: record.get('elementId') })); console.log(` āœ“ Exported ${nodes.length} nodes`); return nodes; } private async exportRelationships(session: Session): Promise { console.log('šŸ”— Exporting relationships...'); const result = await session.run(` MATCH (a)-[r]->(b) RETURN r, type(r) as type, elementId(a) as startId, elementId(b) as endId `); const rels = result.records.map(record => ({ type: record.get('type'), properties: this.convertProperties(record.get('r').properties), startElementId: record.get('startId'), endElementId: record.get('endId') })); console.log(` āœ“ Exported ${rels.length} relationships`); return rels; } private async clearCloud(session: Session): Promise { console.log('šŸ—‘ļø Clearing cloud database...'); let deleted = 0; do { const result = await session.run(` MATCH (n) WITH n LIMIT 1000 DETACH DELETE n RETURN count(*) as deleted `); deleted = result.records[0]?.get('deleted')?.toNumber() || 0; } while (deleted > 0); console.log(' āœ“ Cloud cleared'); } private async applySchema(session: Session, schema: any[]): Promise { console.log('šŸ“‹ Applying schema...'); for (const item of schema) { try { await session.run(item.definition); } catch (error: any) { if (!error.message?.includes('already exists')) { console.warn(` ⚠ ${item.name}: ${error.message?.slice(0, 50)}`); } } } console.log(` āœ“ Applied ${schema.length} schema items`); } private async importNodes(session: Session, nodes: any[], elementIdMap: Map): Promise { console.log('šŸ“¦ Importing nodes...'); let imported = 0; const batchSize = this.config.batchSize!; for (let i = 0; i < nodes.length; i += batchSize) { const batch = nodes.slice(i, i + batchSize); for (const node of batch) { try { const labelStr = node.labels.join(':'); const nodeId = node.properties.id || `node_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`; node.properties.id = nodeId; const result = await session.run(` MERGE (n:${labelStr} {id: $nodeId}) SET n = $props RETURN elementId(n) as newElementId `, { nodeId, props: node.properties }); const newElementId = result.records[0]?.get('newElementId'); if (newElementId) { elementIdMap.set(node.elementId, newElementId); } imported++; } catch (error: any) { // Continue on error } } if ((i + batchSize) % 5000 === 0 || i + batchSize >= nodes.length) { console.log(` Progress: ${Math.min(i + batchSize, nodes.length)}/${nodes.length} nodes`); } } console.log(` āœ“ Imported ${imported} nodes`); } private async importRelationships( session: Session, relationships: any[], elementIdMap: Map, nodes: any[] ): Promise { console.log('šŸ”— Importing relationships...'); const elementIdToNodeId = new Map(); for (const node of nodes) { elementIdToNodeId.set(node.elementId, node.properties.id); } let imported = 0; const batchSize = this.config.batchSize!; for (let i = 0; i < relationships.length; i += batchSize) { const batch = relationships.slice(i, i + batchSize); for (const rel of batch) { try { const startNodeId = elementIdToNodeId.get(rel.startElementId); const endNodeId = elementIdToNodeId.get(rel.endElementId); if (!startNodeId || !endNodeId) continue; await session.run(` MATCH (a {id: $startId}) MATCH (b {id: $endId}) MERGE (a)-[r:${rel.type}]->(b) SET r = $props `, { startId: startNodeId, endId: endNodeId, props: rel.properties }); imported++; } catch (error: any) { // Continue on error } } if ((i + batchSize) % 50000 === 0 || i + batchSize >= relationships.length) { console.log(` Progress: ${Math.min(i + batchSize, relationships.length)}/${relationships.length} relationships`); } } console.log(` āœ“ Imported ${imported} relationships`); } getStatus(): SyncStatus { return { ...this.status }; } } // Singleton instance let autoSyncInstance: Neo4jAutoSync | null = null; export function getNeo4jAutoSync(): Neo4jAutoSync { if (!autoSyncInstance) { autoSyncInstance = new Neo4jAutoSync(); } return autoSyncInstance; }