Kraft102's picture
Update backend source
34367da verified
/**
* Neo4j Auto-Sync Service
*
* Automated synchronization between local Neo4j and AuraDB cloud.
* Supports full sync, incremental sync, and scheduled execution.
*/
import neo4j, { Driver, Session } from 'neo4j-driver';
import cron from 'node-cron';
import fs from 'fs';
import path from 'path';
export interface SyncConfig {
local: {
uri: string;
user: string;
password: string;
database: string;
};
cloud: {
uri: string;
user: string;
password: string;
database: string;
};
schedule?: string; // Cron expression
incrementalEnabled?: boolean;
batchSize?: number;
}
export interface SyncStatus {
lastSyncTime: string | null;
lastSyncType: 'full' | 'incremental' | null;
lastSyncDuration: number | null;
nodesSync: number;
relationshipsSync: number;
status: 'idle' | 'running' | 'success' | 'error';
error?: string;
nextScheduledSync?: string;
}
export interface SyncCheckpoint {
timestamp: string;
nodeCount: number;
relationshipCount: number;
lastNodeIds: string[];
}
const CHECKPOINT_FILE = path.join(process.cwd(), '.neo4j-sync-checkpoint.json');
const STATUS_FILE = path.join(process.cwd(), '.neo4j-sync-status.json');
export class Neo4jAutoSync {
private config: SyncConfig;
private localDriver: Driver | null = null;
private cloudDriver: Driver | null = null;
private cronJob: cron.ScheduledTask | null = null;
private status: SyncStatus = {
lastSyncTime: null,
lastSyncType: null,
lastSyncDuration: null,
nodesSync: 0,
relationshipsSync: 0,
status: 'idle'
};
constructor(config?: Partial<SyncConfig>) {
this.config = {
local: {
uri: config?.local?.uri || process.env.NEO4J_LOCAL_URI || 'bolt://localhost:7687',
user: config?.local?.user || process.env.NEO4J_LOCAL_USER || 'neo4j',
password: config?.local?.password || process.env.NEO4J_LOCAL_PASSWORD || 'password',
database: config?.local?.database || process.env.NEO4J_LOCAL_DATABASE || 'neo4j'
},
cloud: {
uri: config?.cloud?.uri || process.env.NEO4J_URI || '',
user: config?.cloud?.user || process.env.NEO4J_USER || 'neo4j',
password: config?.cloud?.password || process.env.NEO4J_PASSWORD || '',
database: config?.cloud?.database || process.env.NEO4J_DATABASE || 'neo4j'
},
schedule: config?.schedule || process.env.NEO4J_SYNC_SCHEDULE || '0 */6 * * *', // Every 6 hours
incrementalEnabled: config?.incrementalEnabled ?? true,
batchSize: config?.batchSize || 500
};
this.loadStatus();
}
private loadStatus(): void {
try {
if (fs.existsSync(STATUS_FILE)) {
const data = fs.readFileSync(STATUS_FILE, 'utf-8');
this.status = JSON.parse(data);
this.status.status = 'idle'; // Reset running status on restart
}
} catch {
// Ignore errors, use default status
}
}
private saveStatus(): void {
try {
fs.writeFileSync(STATUS_FILE, JSON.stringify(this.status, null, 2));
} catch (error) {
console.error('Failed to save sync status:', error);
}
}
private loadCheckpoint(): SyncCheckpoint | null {
try {
if (fs.existsSync(CHECKPOINT_FILE)) {
const data = fs.readFileSync(CHECKPOINT_FILE, 'utf-8');
return JSON.parse(data);
}
} catch {
// Ignore errors
}
return null;
}
private saveCheckpoint(checkpoint: SyncCheckpoint): void {
try {
fs.writeFileSync(CHECKPOINT_FILE, JSON.stringify(checkpoint, null, 2));
} catch (error) {
console.error('Failed to save checkpoint:', error);
}
}
async connect(): Promise<void> {
console.log('πŸ“‘ Connecting to Neo4j instances...');
this.localDriver = neo4j.driver(
this.config.local.uri,
neo4j.auth.basic(this.config.local.user, this.config.local.password),
{ maxConnectionLifetime: 3 * 60 * 60 * 1000 }
);
await this.localDriver.verifyConnectivity();
console.log(' βœ“ Connected to local Neo4j');
this.cloudDriver = neo4j.driver(
this.config.cloud.uri,
neo4j.auth.basic(this.config.cloud.user, this.config.cloud.password),
{ maxConnectionLifetime: 3 * 60 * 60 * 1000 }
);
await this.cloudDriver.verifyConnectivity();
console.log(' βœ“ Connected to AuraDB cloud');
}
async disconnect(): Promise<void> {
if (this.localDriver) await this.localDriver.close();
if (this.cloudDriver) await this.cloudDriver.close();
this.localDriver = null;
this.cloudDriver = null;
}
/**
* Start scheduled auto-sync
*/
startScheduler(): void {
if (this.cronJob) {
this.cronJob.stop();
}
console.log(`⏰ Starting Neo4j auto-sync scheduler: ${this.config.schedule}`);
this.cronJob = cron.schedule(this.config.schedule!, async () => {
console.log(`\nπŸ”„ [${new Date().toISOString()}] Scheduled sync starting...`);
try {
await this.sync();
} catch (error) {
console.error('Scheduled sync failed:', error);
}
});
// Calculate next run time
const nextRun = this.getNextScheduledTime();
this.status.nextScheduledSync = nextRun;
this.saveStatus();
console.log(` Next sync: ${nextRun}`);
}
stopScheduler(): void {
if (this.cronJob) {
this.cronJob.stop();
this.cronJob = null;
console.log('⏹️ Scheduler stopped');
}
}
private getNextScheduledTime(): string {
// Simple calculation for common cron patterns
const now = new Date();
const schedule = this.config.schedule!;
if (schedule.includes('*/6')) {
// Every 6 hours
const nextHour = Math.ceil(now.getHours() / 6) * 6;
const next = new Date(now);
next.setHours(nextHour, 0, 0, 0);
if (next <= now) next.setHours(next.getHours() + 6);
return next.toISOString();
}
return 'See cron expression: ' + schedule;
}
/**
* Perform sync (auto-detects full vs incremental)
*/
async sync(forceFullSync: boolean = false): Promise<SyncStatus> {
const startTime = Date.now();
this.status.status = 'running';
this.saveStatus();
try {
await this.connect();
const checkpoint = this.loadCheckpoint();
const shouldDoFullSync = forceFullSync || !checkpoint || !this.config.incrementalEnabled;
if (shouldDoFullSync) {
await this.fullSync();
this.status.lastSyncType = 'full';
} else {
const hasChanges = await this.checkForChanges(checkpoint);
if (hasChanges) {
await this.incrementalSync(checkpoint);
this.status.lastSyncType = 'incremental';
} else {
console.log('βœ“ No changes detected, skipping sync');
this.status.lastSyncType = 'incremental';
}
}
this.status.status = 'success';
this.status.lastSyncTime = new Date().toISOString();
this.status.lastSyncDuration = Date.now() - startTime;
delete this.status.error;
} catch (error: any) {
this.status.status = 'error';
this.status.error = error.message;
console.error('❌ Sync failed:', error.message);
} finally {
await this.disconnect();
this.saveStatus();
}
return this.status;
}
private async checkForChanges(checkpoint: SyncCheckpoint): Promise<boolean> {
const session = this.localDriver!.session({ database: this.config.local.database });
try {
// Check node count
const nodeResult = await session.run('MATCH (n) RETURN count(n) as count');
const currentNodeCount = nodeResult.records[0].get('count').toNumber();
// Check relationship count
const relResult = await session.run('MATCH ()-[r]->() RETURN count(r) as count');
const currentRelCount = relResult.records[0].get('count').toNumber();
const hasChanges = currentNodeCount !== checkpoint.nodeCount ||
currentRelCount !== checkpoint.relationshipCount;
if (hasChanges) {
console.log(`πŸ“Š Changes detected: nodes ${checkpoint.nodeCount} β†’ ${currentNodeCount}, rels ${checkpoint.relationshipCount} β†’ ${currentRelCount}`);
}
return hasChanges;
} finally {
await session.close();
}
}
private async fullSync(): Promise<void> {
console.log('\nπŸ”„ Starting FULL sync...');
const localSession = this.localDriver!.session({ database: this.config.local.database });
const cloudSession = this.cloudDriver!.session({ database: this.config.cloud.database });
try {
// Export schema
const schema = await this.exportSchema(localSession);
// Export nodes
const nodes = await this.exportNodes(localSession);
// Export relationships
const relationships = await this.exportRelationships(localSession);
// Clear cloud
await this.clearCloud(cloudSession);
// Apply schema
await this.applySchema(cloudSession, schema);
// Import nodes
const elementIdMap = new Map<string, string>();
await this.importNodes(cloudSession, nodes, elementIdMap);
// Import relationships
await this.importRelationships(cloudSession, relationships, elementIdMap, nodes);
// Save checkpoint
this.saveCheckpoint({
timestamp: new Date().toISOString(),
nodeCount: nodes.length,
relationshipCount: relationships.length,
lastNodeIds: nodes.slice(-100).map(n => n.properties.id)
});
this.status.nodesSync = nodes.length;
this.status.relationshipsSync = relationships.length;
console.log(`\nβœ… Full sync complete: ${nodes.length} nodes, ${relationships.length} relationships`);
} finally {
await localSession.close();
await cloudSession.close();
}
}
private async incrementalSync(checkpoint: SyncCheckpoint): Promise<void> {
console.log('\nπŸ”„ Starting INCREMENTAL sync...');
const localSession = this.localDriver!.session({ database: this.config.local.database });
const cloudSession = this.cloudDriver!.session({ database: this.config.cloud.database });
try {
// Find new/modified nodes since checkpoint
const checkpointTime = new Date(checkpoint.timestamp);
// Get nodes created/modified after checkpoint
// This requires nodes to have a 'updatedAt' or 'createdAt' property
const newNodesResult = await localSession.run(`
MATCH (n)
WHERE n.createdAt > datetime($checkpointTime)
OR n.updatedAt > datetime($checkpointTime)
OR NOT n.id IN $existingIds
RETURN n, elementId(n) as elementId
LIMIT 10000
`, {
checkpointTime: checkpointTime.toISOString(),
existingIds: checkpoint.lastNodeIds
});
const newNodes: any[] = [];
for (const record of newNodesResult.records) {
const node = record.get('n');
newNodes.push({
labels: [...node.labels],
properties: this.convertProperties(node.properties),
elementId: record.get('elementId')
});
}
if (newNodes.length === 0) {
console.log(' No new nodes to sync');
return;
}
console.log(` Found ${newNodes.length} new/modified nodes`);
// Import new nodes
const elementIdMap = new Map<string, string>();
await this.importNodes(cloudSession, newNodes, elementIdMap);
// Get new relationships for these nodes
const nodeIds = newNodes.map(n => n.properties.id).filter(Boolean);
if (nodeIds.length > 0) {
const newRelsResult = await localSession.run(`
MATCH (a)-[r]->(b)
WHERE a.id IN $nodeIds OR b.id IN $nodeIds
RETURN r, type(r) as type, elementId(a) as startId, elementId(b) as endId
`, { nodeIds });
const newRels: any[] = [];
for (const record of newRelsResult.records) {
const rel = record.get('r');
newRels.push({
type: record.get('type'),
properties: this.convertProperties(rel.properties),
startElementId: record.get('startId'),
endElementId: record.get('endId')
});
}
if (newRels.length > 0) {
await this.importRelationships(cloudSession, newRels, elementIdMap, newNodes);
}
}
// Update checkpoint
const nodeCountResult = await localSession.run('MATCH (n) RETURN count(n) as count');
const relCountResult = await localSession.run('MATCH ()-[r]->() RETURN count(r) as count');
this.saveCheckpoint({
timestamp: new Date().toISOString(),
nodeCount: nodeCountResult.records[0].get('count').toNumber(),
relationshipCount: relCountResult.records[0].get('count').toNumber(),
lastNodeIds: [...checkpoint.lastNodeIds, ...nodeIds].slice(-100)
});
this.status.nodesSync = newNodes.length;
this.status.relationshipsSync = 0;
console.log(`βœ… Incremental sync complete: ${newNodes.length} nodes`);
} finally {
await localSession.close();
await cloudSession.close();
}
}
private convertProperties(props: Record<string, any>): Record<string, any> {
const result: Record<string, any> = {};
for (const [key, value] of Object.entries(props)) {
if (value === null || value === undefined) {
result[key] = value;
} else if (typeof value === 'object' && value.constructor?.name === 'Integer') {
result[key] = value.toNumber();
} else if (typeof value === 'object' && (value.constructor?.name === 'DateTime' || value.constructor?.name === 'Date')) {
result[key] = value.toString();
} else if (Array.isArray(value)) {
result[key] = value.map(v => this.convertProperties({ v }).v);
} else {
result[key] = value;
}
}
return result;
}
private async exportSchema(session: Session): Promise<any[]> {
const schema: any[] = [];
const constraintResult = await session.run('SHOW CONSTRAINTS');
for (const record of constraintResult.records) {
const name = record.get('name');
const type = record.get('type');
const entityType = record.get('entityType');
const labelsOrTypes = record.get('labelsOrTypes');
const properties = record.get('properties');
if (type === 'UNIQUENESS' && entityType === 'NODE') {
const label = labelsOrTypes?.[0];
const prop = properties?.[0];
if (label && prop) {
schema.push({
type: 'constraint',
name,
definition: `CREATE CONSTRAINT ${name} IF NOT EXISTS FOR (n:${label}) REQUIRE n.${prop} IS UNIQUE`
});
}
}
}
const indexResult = await session.run('SHOW INDEXES');
for (const record of indexResult.records) {
const name = record.get('name');
const type = record.get('type');
const entityType = record.get('entityType');
const labelsOrTypes = record.get('labelsOrTypes');
const properties = record.get('properties');
const owningConstraint = record.get('owningConstraint');
if (owningConstraint) continue;
if (type === 'RANGE' && entityType === 'NODE') {
const label = labelsOrTypes?.[0];
const prop = properties?.[0];
if (label && prop) {
schema.push({
type: 'index',
name,
definition: `CREATE INDEX ${name} IF NOT EXISTS FOR (n:${label}) ON (n.${prop})`
});
}
}
}
return schema;
}
private async exportNodes(session: Session): Promise<any[]> {
console.log('πŸ“¦ Exporting nodes...');
const result = await session.run('MATCH (n) RETURN n, elementId(n) as elementId');
const nodes = result.records.map(record => ({
labels: [...record.get('n').labels],
properties: this.convertProperties(record.get('n').properties),
elementId: record.get('elementId')
}));
console.log(` βœ“ Exported ${nodes.length} nodes`);
return nodes;
}
private async exportRelationships(session: Session): Promise<any[]> {
console.log('πŸ”— Exporting relationships...');
const result = await session.run(`
MATCH (a)-[r]->(b)
RETURN r, type(r) as type, elementId(a) as startId, elementId(b) as endId
`);
const rels = result.records.map(record => ({
type: record.get('type'),
properties: this.convertProperties(record.get('r').properties),
startElementId: record.get('startId'),
endElementId: record.get('endId')
}));
console.log(` βœ“ Exported ${rels.length} relationships`);
return rels;
}
private async clearCloud(session: Session): Promise<void> {
console.log('πŸ—‘οΈ Clearing cloud database...');
let deleted = 0;
do {
const result = await session.run(`
MATCH (n) WITH n LIMIT 1000 DETACH DELETE n RETURN count(*) as deleted
`);
deleted = result.records[0]?.get('deleted')?.toNumber() || 0;
} while (deleted > 0);
console.log(' βœ“ Cloud cleared');
}
private async applySchema(session: Session, schema: any[]): Promise<void> {
console.log('πŸ“‹ Applying schema...');
for (const item of schema) {
try {
await session.run(item.definition);
} catch (error: any) {
if (!error.message?.includes('already exists')) {
console.warn(` ⚠ ${item.name}: ${error.message?.slice(0, 50)}`);
}
}
}
console.log(` βœ“ Applied ${schema.length} schema items`);
}
private async importNodes(session: Session, nodes: any[], elementIdMap: Map<string, string>): Promise<void> {
console.log('πŸ“¦ Importing nodes...');
let imported = 0;
const batchSize = this.config.batchSize!;
for (let i = 0; i < nodes.length; i += batchSize) {
const batch = nodes.slice(i, i + batchSize);
for (const node of batch) {
try {
const labelStr = node.labels.join(':');
const nodeId = node.properties.id || `node_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
node.properties.id = nodeId;
const result = await session.run(`
MERGE (n:${labelStr} {id: $nodeId})
SET n = $props
RETURN elementId(n) as newElementId
`, { nodeId, props: node.properties });
const newElementId = result.records[0]?.get('newElementId');
if (newElementId) {
elementIdMap.set(node.elementId, newElementId);
}
imported++;
} catch (error: any) {
// Continue on error
}
}
if ((i + batchSize) % 5000 === 0 || i + batchSize >= nodes.length) {
console.log(` Progress: ${Math.min(i + batchSize, nodes.length)}/${nodes.length} nodes`);
}
}
console.log(` βœ“ Imported ${imported} nodes`);
}
private async importRelationships(
session: Session,
relationships: any[],
elementIdMap: Map<string, string>,
nodes: any[]
): Promise<void> {
console.log('πŸ”— Importing relationships...');
const elementIdToNodeId = new Map<string, string>();
for (const node of nodes) {
elementIdToNodeId.set(node.elementId, node.properties.id);
}
let imported = 0;
const batchSize = this.config.batchSize!;
for (let i = 0; i < relationships.length; i += batchSize) {
const batch = relationships.slice(i, i + batchSize);
for (const rel of batch) {
try {
const startNodeId = elementIdToNodeId.get(rel.startElementId);
const endNodeId = elementIdToNodeId.get(rel.endElementId);
if (!startNodeId || !endNodeId) continue;
await session.run(`
MATCH (a {id: $startId})
MATCH (b {id: $endId})
MERGE (a)-[r:${rel.type}]->(b)
SET r = $props
`, { startId: startNodeId, endId: endNodeId, props: rel.properties });
imported++;
} catch (error: any) {
// Continue on error
}
}
if ((i + batchSize) % 50000 === 0 || i + batchSize >= relationships.length) {
console.log(` Progress: ${Math.min(i + batchSize, relationships.length)}/${relationships.length} relationships`);
}
}
console.log(` βœ“ Imported ${imported} relationships`);
}
getStatus(): SyncStatus {
return { ...this.status };
}
}
// Singleton instance
let autoSyncInstance: Neo4jAutoSync | null = null;
export function getNeo4jAutoSync(): Neo4jAutoSync {
if (!autoSyncInstance) {
autoSyncInstance = new Neo4jAutoSync();
}
return autoSyncInstance;
}