Spaces:
Paused
Paused
| /** | |
| * Neo4j Auto-Sync Service | |
| * | |
| * Automated synchronization between local Neo4j and AuraDB cloud. | |
| * Supports full sync, incremental sync, and scheduled execution. | |
| */ | |
| import neo4j, { Driver, Session } from 'neo4j-driver'; | |
| import cron from 'node-cron'; | |
| import fs from 'fs'; | |
| import path from 'path'; | |
| export interface SyncConfig { | |
| local: { | |
| uri: string; | |
| user: string; | |
| password: string; | |
| database: string; | |
| }; | |
| cloud: { | |
| uri: string; | |
| user: string; | |
| password: string; | |
| database: string; | |
| }; | |
| schedule?: string; // Cron expression | |
| incrementalEnabled?: boolean; | |
| batchSize?: number; | |
| } | |
| export interface SyncStatus { | |
| lastSyncTime: string | null; | |
| lastSyncType: 'full' | 'incremental' | null; | |
| lastSyncDuration: number | null; | |
| nodesSync: number; | |
| relationshipsSync: number; | |
| status: 'idle' | 'running' | 'success' | 'error'; | |
| error?: string; | |
| nextScheduledSync?: string; | |
| } | |
| export interface SyncCheckpoint { | |
| timestamp: string; | |
| nodeCount: number; | |
| relationshipCount: number; | |
| lastNodeIds: string[]; | |
| } | |
| const CHECKPOINT_FILE = path.join(process.cwd(), '.neo4j-sync-checkpoint.json'); | |
| const STATUS_FILE = path.join(process.cwd(), '.neo4j-sync-status.json'); | |
| export class Neo4jAutoSync { | |
| private config: SyncConfig; | |
| private localDriver: Driver | null = null; | |
| private cloudDriver: Driver | null = null; | |
| private cronJob: cron.ScheduledTask | null = null; | |
| private status: SyncStatus = { | |
| lastSyncTime: null, | |
| lastSyncType: null, | |
| lastSyncDuration: null, | |
| nodesSync: 0, | |
| relationshipsSync: 0, | |
| status: 'idle' | |
| }; | |
| constructor(config?: Partial<SyncConfig>) { | |
| this.config = { | |
| local: { | |
| uri: config?.local?.uri || process.env.NEO4J_LOCAL_URI || 'bolt://localhost:7687', | |
| user: config?.local?.user || process.env.NEO4J_LOCAL_USER || 'neo4j', | |
| password: config?.local?.password || process.env.NEO4J_LOCAL_PASSWORD || 'password', | |
| database: config?.local?.database || process.env.NEO4J_LOCAL_DATABASE || 'neo4j' | |
| }, | |
| cloud: { | |
| uri: config?.cloud?.uri || process.env.NEO4J_URI || '', | |
| user: config?.cloud?.user || process.env.NEO4J_USER || 'neo4j', | |
| password: config?.cloud?.password || process.env.NEO4J_PASSWORD || '', | |
| database: config?.cloud?.database || process.env.NEO4J_DATABASE || 'neo4j' | |
| }, | |
| schedule: config?.schedule || process.env.NEO4J_SYNC_SCHEDULE || '0 */6 * * *', // Every 6 hours | |
| incrementalEnabled: config?.incrementalEnabled ?? true, | |
| batchSize: config?.batchSize || 500 | |
| }; | |
| this.loadStatus(); | |
| } | |
| private loadStatus(): void { | |
| try { | |
| if (fs.existsSync(STATUS_FILE)) { | |
| const data = fs.readFileSync(STATUS_FILE, 'utf-8'); | |
| this.status = JSON.parse(data); | |
| this.status.status = 'idle'; // Reset running status on restart | |
| } | |
| } catch { | |
| // Ignore errors, use default status | |
| } | |
| } | |
| private saveStatus(): void { | |
| try { | |
| fs.writeFileSync(STATUS_FILE, JSON.stringify(this.status, null, 2)); | |
| } catch (error) { | |
| console.error('Failed to save sync status:', error); | |
| } | |
| } | |
| private loadCheckpoint(): SyncCheckpoint | null { | |
| try { | |
| if (fs.existsSync(CHECKPOINT_FILE)) { | |
| const data = fs.readFileSync(CHECKPOINT_FILE, 'utf-8'); | |
| return JSON.parse(data); | |
| } | |
| } catch { | |
| // Ignore errors | |
| } | |
| return null; | |
| } | |
| private saveCheckpoint(checkpoint: SyncCheckpoint): void { | |
| try { | |
| fs.writeFileSync(CHECKPOINT_FILE, JSON.stringify(checkpoint, null, 2)); | |
| } catch (error) { | |
| console.error('Failed to save checkpoint:', error); | |
| } | |
| } | |
| async connect(): Promise<void> { | |
| console.log('π‘ Connecting to Neo4j instances...'); | |
| this.localDriver = neo4j.driver( | |
| this.config.local.uri, | |
| neo4j.auth.basic(this.config.local.user, this.config.local.password), | |
| { maxConnectionLifetime: 3 * 60 * 60 * 1000 } | |
| ); | |
| await this.localDriver.verifyConnectivity(); | |
| console.log(' β Connected to local Neo4j'); | |
| this.cloudDriver = neo4j.driver( | |
| this.config.cloud.uri, | |
| neo4j.auth.basic(this.config.cloud.user, this.config.cloud.password), | |
| { maxConnectionLifetime: 3 * 60 * 60 * 1000 } | |
| ); | |
| await this.cloudDriver.verifyConnectivity(); | |
| console.log(' β Connected to AuraDB cloud'); | |
| } | |
| async disconnect(): Promise<void> { | |
| if (this.localDriver) await this.localDriver.close(); | |
| if (this.cloudDriver) await this.cloudDriver.close(); | |
| this.localDriver = null; | |
| this.cloudDriver = null; | |
| } | |
| /** | |
| * Start scheduled auto-sync | |
| */ | |
| startScheduler(): void { | |
| if (this.cronJob) { | |
| this.cronJob.stop(); | |
| } | |
| console.log(`β° Starting Neo4j auto-sync scheduler: ${this.config.schedule}`); | |
| this.cronJob = cron.schedule(this.config.schedule!, async () => { | |
| console.log(`\nπ [${new Date().toISOString()}] Scheduled sync starting...`); | |
| try { | |
| await this.sync(); | |
| } catch (error) { | |
| console.error('Scheduled sync failed:', error); | |
| } | |
| }); | |
| // Calculate next run time | |
| const nextRun = this.getNextScheduledTime(); | |
| this.status.nextScheduledSync = nextRun; | |
| this.saveStatus(); | |
| console.log(` Next sync: ${nextRun}`); | |
| } | |
| stopScheduler(): void { | |
| if (this.cronJob) { | |
| this.cronJob.stop(); | |
| this.cronJob = null; | |
| console.log('βΉοΈ Scheduler stopped'); | |
| } | |
| } | |
| private getNextScheduledTime(): string { | |
| // Simple calculation for common cron patterns | |
| const now = new Date(); | |
| const schedule = this.config.schedule!; | |
| if (schedule.includes('*/6')) { | |
| // Every 6 hours | |
| const nextHour = Math.ceil(now.getHours() / 6) * 6; | |
| const next = new Date(now); | |
| next.setHours(nextHour, 0, 0, 0); | |
| if (next <= now) next.setHours(next.getHours() + 6); | |
| return next.toISOString(); | |
| } | |
| return 'See cron expression: ' + schedule; | |
| } | |
| /** | |
| * Perform sync (auto-detects full vs incremental) | |
| */ | |
| async sync(forceFullSync: boolean = false): Promise<SyncStatus> { | |
| const startTime = Date.now(); | |
| this.status.status = 'running'; | |
| this.saveStatus(); | |
| try { | |
| await this.connect(); | |
| const checkpoint = this.loadCheckpoint(); | |
| const shouldDoFullSync = forceFullSync || !checkpoint || !this.config.incrementalEnabled; | |
| if (shouldDoFullSync) { | |
| await this.fullSync(); | |
| this.status.lastSyncType = 'full'; | |
| } else { | |
| const hasChanges = await this.checkForChanges(checkpoint); | |
| if (hasChanges) { | |
| await this.incrementalSync(checkpoint); | |
| this.status.lastSyncType = 'incremental'; | |
| } else { | |
| console.log('β No changes detected, skipping sync'); | |
| this.status.lastSyncType = 'incremental'; | |
| } | |
| } | |
| this.status.status = 'success'; | |
| this.status.lastSyncTime = new Date().toISOString(); | |
| this.status.lastSyncDuration = Date.now() - startTime; | |
| delete this.status.error; | |
| } catch (error: any) { | |
| this.status.status = 'error'; | |
| this.status.error = error.message; | |
| console.error('β Sync failed:', error.message); | |
| } finally { | |
| await this.disconnect(); | |
| this.saveStatus(); | |
| } | |
| return this.status; | |
| } | |
| private async checkForChanges(checkpoint: SyncCheckpoint): Promise<boolean> { | |
| const session = this.localDriver!.session({ database: this.config.local.database }); | |
| try { | |
| // Check node count | |
| const nodeResult = await session.run('MATCH (n) RETURN count(n) as count'); | |
| const currentNodeCount = nodeResult.records[0].get('count').toNumber(); | |
| // Check relationship count | |
| const relResult = await session.run('MATCH ()-[r]->() RETURN count(r) as count'); | |
| const currentRelCount = relResult.records[0].get('count').toNumber(); | |
| const hasChanges = currentNodeCount !== checkpoint.nodeCount || | |
| currentRelCount !== checkpoint.relationshipCount; | |
| if (hasChanges) { | |
| console.log(`π Changes detected: nodes ${checkpoint.nodeCount} β ${currentNodeCount}, rels ${checkpoint.relationshipCount} β ${currentRelCount}`); | |
| } | |
| return hasChanges; | |
| } finally { | |
| await session.close(); | |
| } | |
| } | |
| private async fullSync(): Promise<void> { | |
| console.log('\nπ Starting FULL sync...'); | |
| const localSession = this.localDriver!.session({ database: this.config.local.database }); | |
| const cloudSession = this.cloudDriver!.session({ database: this.config.cloud.database }); | |
| try { | |
| // Export schema | |
| const schema = await this.exportSchema(localSession); | |
| // Export nodes | |
| const nodes = await this.exportNodes(localSession); | |
| // Export relationships | |
| const relationships = await this.exportRelationships(localSession); | |
| // Clear cloud | |
| await this.clearCloud(cloudSession); | |
| // Apply schema | |
| await this.applySchema(cloudSession, schema); | |
| // Import nodes | |
| const elementIdMap = new Map<string, string>(); | |
| await this.importNodes(cloudSession, nodes, elementIdMap); | |
| // Import relationships | |
| await this.importRelationships(cloudSession, relationships, elementIdMap, nodes); | |
| // Save checkpoint | |
| this.saveCheckpoint({ | |
| timestamp: new Date().toISOString(), | |
| nodeCount: nodes.length, | |
| relationshipCount: relationships.length, | |
| lastNodeIds: nodes.slice(-100).map(n => n.properties.id) | |
| }); | |
| this.status.nodesSync = nodes.length; | |
| this.status.relationshipsSync = relationships.length; | |
| console.log(`\nβ Full sync complete: ${nodes.length} nodes, ${relationships.length} relationships`); | |
| } finally { | |
| await localSession.close(); | |
| await cloudSession.close(); | |
| } | |
| } | |
| private async incrementalSync(checkpoint: SyncCheckpoint): Promise<void> { | |
| console.log('\nπ Starting INCREMENTAL sync...'); | |
| const localSession = this.localDriver!.session({ database: this.config.local.database }); | |
| const cloudSession = this.cloudDriver!.session({ database: this.config.cloud.database }); | |
| try { | |
| // Find new/modified nodes since checkpoint | |
| const checkpointTime = new Date(checkpoint.timestamp); | |
| // Get nodes created/modified after checkpoint | |
| // This requires nodes to have a 'updatedAt' or 'createdAt' property | |
| const newNodesResult = await localSession.run(` | |
| MATCH (n) | |
| WHERE n.createdAt > datetime($checkpointTime) | |
| OR n.updatedAt > datetime($checkpointTime) | |
| OR NOT n.id IN $existingIds | |
| RETURN n, elementId(n) as elementId | |
| LIMIT 10000 | |
| `, { | |
| checkpointTime: checkpointTime.toISOString(), | |
| existingIds: checkpoint.lastNodeIds | |
| }); | |
| const newNodes: any[] = []; | |
| for (const record of newNodesResult.records) { | |
| const node = record.get('n'); | |
| newNodes.push({ | |
| labels: [...node.labels], | |
| properties: this.convertProperties(node.properties), | |
| elementId: record.get('elementId') | |
| }); | |
| } | |
| if (newNodes.length === 0) { | |
| console.log(' No new nodes to sync'); | |
| return; | |
| } | |
| console.log(` Found ${newNodes.length} new/modified nodes`); | |
| // Import new nodes | |
| const elementIdMap = new Map<string, string>(); | |
| await this.importNodes(cloudSession, newNodes, elementIdMap); | |
| // Get new relationships for these nodes | |
| const nodeIds = newNodes.map(n => n.properties.id).filter(Boolean); | |
| if (nodeIds.length > 0) { | |
| const newRelsResult = await localSession.run(` | |
| MATCH (a)-[r]->(b) | |
| WHERE a.id IN $nodeIds OR b.id IN $nodeIds | |
| RETURN r, type(r) as type, elementId(a) as startId, elementId(b) as endId | |
| `, { nodeIds }); | |
| const newRels: any[] = []; | |
| for (const record of newRelsResult.records) { | |
| const rel = record.get('r'); | |
| newRels.push({ | |
| type: record.get('type'), | |
| properties: this.convertProperties(rel.properties), | |
| startElementId: record.get('startId'), | |
| endElementId: record.get('endId') | |
| }); | |
| } | |
| if (newRels.length > 0) { | |
| await this.importRelationships(cloudSession, newRels, elementIdMap, newNodes); | |
| } | |
| } | |
| // Update checkpoint | |
| const nodeCountResult = await localSession.run('MATCH (n) RETURN count(n) as count'); | |
| const relCountResult = await localSession.run('MATCH ()-[r]->() RETURN count(r) as count'); | |
| this.saveCheckpoint({ | |
| timestamp: new Date().toISOString(), | |
| nodeCount: nodeCountResult.records[0].get('count').toNumber(), | |
| relationshipCount: relCountResult.records[0].get('count').toNumber(), | |
| lastNodeIds: [...checkpoint.lastNodeIds, ...nodeIds].slice(-100) | |
| }); | |
| this.status.nodesSync = newNodes.length; | |
| this.status.relationshipsSync = 0; | |
| console.log(`β Incremental sync complete: ${newNodes.length} nodes`); | |
| } finally { | |
| await localSession.close(); | |
| await cloudSession.close(); | |
| } | |
| } | |
| private convertProperties(props: Record<string, any>): Record<string, any> { | |
| const result: Record<string, any> = {}; | |
| for (const [key, value] of Object.entries(props)) { | |
| if (value === null || value === undefined) { | |
| result[key] = value; | |
| } else if (typeof value === 'object' && value.constructor?.name === 'Integer') { | |
| result[key] = value.toNumber(); | |
| } else if (typeof value === 'object' && (value.constructor?.name === 'DateTime' || value.constructor?.name === 'Date')) { | |
| result[key] = value.toString(); | |
| } else if (Array.isArray(value)) { | |
| result[key] = value.map(v => this.convertProperties({ v }).v); | |
| } else { | |
| result[key] = value; | |
| } | |
| } | |
| return result; | |
| } | |
| private async exportSchema(session: Session): Promise<any[]> { | |
| const schema: any[] = []; | |
| const constraintResult = await session.run('SHOW CONSTRAINTS'); | |
| for (const record of constraintResult.records) { | |
| const name = record.get('name'); | |
| const type = record.get('type'); | |
| const entityType = record.get('entityType'); | |
| const labelsOrTypes = record.get('labelsOrTypes'); | |
| const properties = record.get('properties'); | |
| if (type === 'UNIQUENESS' && entityType === 'NODE') { | |
| const label = labelsOrTypes?.[0]; | |
| const prop = properties?.[0]; | |
| if (label && prop) { | |
| schema.push({ | |
| type: 'constraint', | |
| name, | |
| definition: `CREATE CONSTRAINT ${name} IF NOT EXISTS FOR (n:${label}) REQUIRE n.${prop} IS UNIQUE` | |
| }); | |
| } | |
| } | |
| } | |
| const indexResult = await session.run('SHOW INDEXES'); | |
| for (const record of indexResult.records) { | |
| const name = record.get('name'); | |
| const type = record.get('type'); | |
| const entityType = record.get('entityType'); | |
| const labelsOrTypes = record.get('labelsOrTypes'); | |
| const properties = record.get('properties'); | |
| const owningConstraint = record.get('owningConstraint'); | |
| if (owningConstraint) continue; | |
| if (type === 'RANGE' && entityType === 'NODE') { | |
| const label = labelsOrTypes?.[0]; | |
| const prop = properties?.[0]; | |
| if (label && prop) { | |
| schema.push({ | |
| type: 'index', | |
| name, | |
| definition: `CREATE INDEX ${name} IF NOT EXISTS FOR (n:${label}) ON (n.${prop})` | |
| }); | |
| } | |
| } | |
| } | |
| return schema; | |
| } | |
| private async exportNodes(session: Session): Promise<any[]> { | |
| console.log('π¦ Exporting nodes...'); | |
| const result = await session.run('MATCH (n) RETURN n, elementId(n) as elementId'); | |
| const nodes = result.records.map(record => ({ | |
| labels: [...record.get('n').labels], | |
| properties: this.convertProperties(record.get('n').properties), | |
| elementId: record.get('elementId') | |
| })); | |
| console.log(` β Exported ${nodes.length} nodes`); | |
| return nodes; | |
| } | |
| private async exportRelationships(session: Session): Promise<any[]> { | |
| console.log('π Exporting relationships...'); | |
| const result = await session.run(` | |
| MATCH (a)-[r]->(b) | |
| RETURN r, type(r) as type, elementId(a) as startId, elementId(b) as endId | |
| `); | |
| const rels = result.records.map(record => ({ | |
| type: record.get('type'), | |
| properties: this.convertProperties(record.get('r').properties), | |
| startElementId: record.get('startId'), | |
| endElementId: record.get('endId') | |
| })); | |
| console.log(` β Exported ${rels.length} relationships`); | |
| return rels; | |
| } | |
| private async clearCloud(session: Session): Promise<void> { | |
| console.log('ποΈ Clearing cloud database...'); | |
| let deleted = 0; | |
| do { | |
| const result = await session.run(` | |
| MATCH (n) WITH n LIMIT 1000 DETACH DELETE n RETURN count(*) as deleted | |
| `); | |
| deleted = result.records[0]?.get('deleted')?.toNumber() || 0; | |
| } while (deleted > 0); | |
| console.log(' β Cloud cleared'); | |
| } | |
| private async applySchema(session: Session, schema: any[]): Promise<void> { | |
| console.log('π Applying schema...'); | |
| for (const item of schema) { | |
| try { | |
| await session.run(item.definition); | |
| } catch (error: any) { | |
| if (!error.message?.includes('already exists')) { | |
| console.warn(` β ${item.name}: ${error.message?.slice(0, 50)}`); | |
| } | |
| } | |
| } | |
| console.log(` β Applied ${schema.length} schema items`); | |
| } | |
| private async importNodes(session: Session, nodes: any[], elementIdMap: Map<string, string>): Promise<void> { | |
| console.log('π¦ Importing nodes...'); | |
| let imported = 0; | |
| const batchSize = this.config.batchSize!; | |
| for (let i = 0; i < nodes.length; i += batchSize) { | |
| const batch = nodes.slice(i, i + batchSize); | |
| for (const node of batch) { | |
| try { | |
| const labelStr = node.labels.join(':'); | |
| const nodeId = node.properties.id || `node_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`; | |
| node.properties.id = nodeId; | |
| const result = await session.run(` | |
| MERGE (n:${labelStr} {id: $nodeId}) | |
| SET n = $props | |
| RETURN elementId(n) as newElementId | |
| `, { nodeId, props: node.properties }); | |
| const newElementId = result.records[0]?.get('newElementId'); | |
| if (newElementId) { | |
| elementIdMap.set(node.elementId, newElementId); | |
| } | |
| imported++; | |
| } catch (error: any) { | |
| // Continue on error | |
| } | |
| } | |
| if ((i + batchSize) % 5000 === 0 || i + batchSize >= nodes.length) { | |
| console.log(` Progress: ${Math.min(i + batchSize, nodes.length)}/${nodes.length} nodes`); | |
| } | |
| } | |
| console.log(` β Imported ${imported} nodes`); | |
| } | |
| private async importRelationships( | |
| session: Session, | |
| relationships: any[], | |
| elementIdMap: Map<string, string>, | |
| nodes: any[] | |
| ): Promise<void> { | |
| console.log('π Importing relationships...'); | |
| const elementIdToNodeId = new Map<string, string>(); | |
| for (const node of nodes) { | |
| elementIdToNodeId.set(node.elementId, node.properties.id); | |
| } | |
| let imported = 0; | |
| const batchSize = this.config.batchSize!; | |
| for (let i = 0; i < relationships.length; i += batchSize) { | |
| const batch = relationships.slice(i, i + batchSize); | |
| for (const rel of batch) { | |
| try { | |
| const startNodeId = elementIdToNodeId.get(rel.startElementId); | |
| const endNodeId = elementIdToNodeId.get(rel.endElementId); | |
| if (!startNodeId || !endNodeId) continue; | |
| await session.run(` | |
| MATCH (a {id: $startId}) | |
| MATCH (b {id: $endId}) | |
| MERGE (a)-[r:${rel.type}]->(b) | |
| SET r = $props | |
| `, { startId: startNodeId, endId: endNodeId, props: rel.properties }); | |
| imported++; | |
| } catch (error: any) { | |
| // Continue on error | |
| } | |
| } | |
| if ((i + batchSize) % 50000 === 0 || i + batchSize >= relationships.length) { | |
| console.log(` Progress: ${Math.min(i + batchSize, relationships.length)}/${relationships.length} relationships`); | |
| } | |
| } | |
| console.log(` β Imported ${imported} relationships`); | |
| } | |
| getStatus(): SyncStatus { | |
| return { ...this.status }; | |
| } | |
| } | |
| // Singleton instance | |
| let autoSyncInstance: Neo4jAutoSync | null = null; | |
| export function getNeo4jAutoSync(): Neo4jAutoSync { | |
| if (!autoSyncInstance) { | |
| autoSyncInstance = new Neo4jAutoSync(); | |
| } | |
| return autoSyncInstance; | |
| } | |