widgettdc-api / apps /backend /src /scripts /sync-neo4j-to-cloud.ts
Kraft102's picture
Update backend source
34367da verified
/**
* Neo4j Local to Cloud Sync Script
*
* Syncs data from local Neo4j (bruttopuljen/master) to AuraDB cloud
* Run: npx tsx apps/backend/src/scripts/sync-neo4j-to-cloud.ts
*/
import neo4j, { Driver, Session, Record as Neo4jRecord, isNode, isRelationship } from 'neo4j-driver';
import dotenv from 'dotenv';
import path from 'path';
// Load environment
dotenv.config({ path: path.resolve(process.cwd(), '.env.production') });
interface SyncConfig {
local: {
uri: string;
user: string;
password: string;
database: string;
};
cloud: {
uri: string;
user: string;
password: string;
database: string;
};
}
interface NodeData {
labels: string[];
properties: Record<string, any>;
elementId: string;
}
interface RelationshipData {
type: string;
properties: Record<string, any>;
startElementId: string;
endElementId: string;
}
interface SchemaItem {
type: 'constraint' | 'index';
name: string;
definition: string;
}
const config: SyncConfig = {
local: {
uri: 'bolt://localhost:7687',
user: 'neo4j',
password: 'password', // Update if different
database: 'neo4j'
},
cloud: {
uri: process.env.NEO4J_URI || 'neo4j+s://054eff27.databases.neo4j.io',
user: process.env.NEO4J_USER || 'neo4j',
password: process.env.NEO4J_PASSWORD || '',
database: process.env.NEO4J_DATABASE || 'neo4j'
}
};
async function createDriver(connectionConfig: SyncConfig['local'] | SyncConfig['cloud']): Promise<Driver> {
return neo4j.driver(
connectionConfig.uri,
neo4j.auth.basic(connectionConfig.user, connectionConfig.password),
{
maxConnectionLifetime: 3 * 60 * 60 * 1000,
connectionAcquisitionTimeout: 2 * 60 * 1000,
}
);
}
async function exportSchema(session: Session): Promise<SchemaItem[]> {
console.log('πŸ“‹ Exporting schema from local...');
const schema: SchemaItem[] = [];
// Export constraints
const constraintResult = await session.run('SHOW CONSTRAINTS');
for (const record of constraintResult.records) {
const name = record.get('name');
const type = record.get('type');
const entityType = record.get('entityType');
const labelsOrTypes = record.get('labelsOrTypes');
const properties = record.get('properties');
let definition = '';
if (type === 'UNIQUENESS' && entityType === 'NODE') {
const label = labelsOrTypes?.[0] || 'Unknown';
const prop = properties?.[0] || 'id';
definition = `CREATE CONSTRAINT ${name} IF NOT EXISTS FOR (n:${label}) REQUIRE n.${prop} IS UNIQUE`;
}
if (definition) {
schema.push({ type: 'constraint', name, definition });
console.log(` βœ“ Constraint: ${name}`);
}
}
// Export indexes (excluding auto-generated and constraint-backed)
const indexResult = await session.run('SHOW INDEXES');
for (const record of indexResult.records) {
const name = record.get('name');
const type = record.get('type');
const entityType = record.get('entityType');
const labelsOrTypes = record.get('labelsOrTypes');
const properties = record.get('properties');
const owningConstraint = record.get('owningConstraint');
// Skip constraint-backed indexes
if (owningConstraint) continue;
let definition = '';
if (type === 'RANGE' && entityType === 'NODE') {
const label = labelsOrTypes?.[0] || 'Unknown';
const props = properties?.join(', ') || 'id';
definition = `CREATE INDEX ${name} IF NOT EXISTS FOR (n:${label}) ON (n.${props.split(',')[0].trim()})`;
} else if (type === 'FULLTEXT' && entityType === 'NODE') {
const label = labelsOrTypes?.[0] || 'Unknown';
const props = (properties || []).map((p: string) => `n.${p}`).join(', ');
definition = `CREATE FULLTEXT INDEX ${name} IF NOT EXISTS FOR (n:${label}) ON EACH [${props}]`;
} else if (type === 'VECTOR') {
const label = labelsOrTypes?.[0] || 'Unknown';
const prop = properties?.[0] || 'embedding';
definition = `CREATE VECTOR INDEX ${name} IF NOT EXISTS FOR (n:${label}) ON (n.${prop}) OPTIONS {indexConfig: {\`vector.dimensions\`: 384, \`vector.similarity_function\`: 'cosine'}}`;
}
if (definition) {
schema.push({ type: 'index', name, definition });
console.log(` βœ“ Index: ${name} (${type})`);
}
}
console.log(` Total: ${schema.length} schema items`);
return schema;
}
async function exportNodes(session: Session): Promise<NodeData[]> {
console.log('\nπŸ“¦ Exporting nodes from local...');
const nodes: NodeData[] = [];
const result = await session.run('MATCH (n) RETURN n, elementId(n) as elementId');
for (const record of result.records) {
const node = record.get('n');
const elementId = record.get('elementId');
if (isNode(node)) {
// Clone properties and convert Neo4j types to JS types
const properties: Record<string, any> = {};
for (const [key, value] of Object.entries(node.properties)) {
properties[key] = convertNeo4jValue(value);
}
nodes.push({
labels: [...node.labels],
properties,
elementId
});
}
}
console.log(` βœ“ Exported ${nodes.length} nodes`);
// Show label distribution
const labelCounts: Record<string, number> = {};
for (const node of nodes) {
for (const label of node.labels) {
labelCounts[label] = (labelCounts[label] || 0) + 1;
}
}
for (const [label, count] of Object.entries(labelCounts)) {
console.log(` - ${label}: ${count}`);
}
return nodes;
}
async function exportRelationships(session: Session): Promise<RelationshipData[]> {
console.log('\nπŸ”— Exporting relationships from local...');
const relationships: RelationshipData[] = [];
const result = await session.run(`
MATCH (a)-[r]->(b)
RETURN r, type(r) as type, elementId(a) as startId, elementId(b) as endId
`);
for (const record of result.records) {
const rel = record.get('r');
const type = record.get('type');
const startId = record.get('startId');
const endId = record.get('endId');
// Clone properties and convert Neo4j types
const properties: Record<string, any> = {};
if (isRelationship(rel)) {
for (const [key, value] of Object.entries(rel.properties)) {
properties[key] = convertNeo4jValue(value);
}
}
relationships.push({
type,
properties,
startElementId: startId,
endElementId: endId
});
}
console.log(` βœ“ Exported ${relationships.length} relationships`);
// Show relationship type distribution
const typeCounts: Record<string, number> = {};
for (const rel of relationships) {
typeCounts[rel.type] = (typeCounts[rel.type] || 0) + 1;
}
for (const [type, count] of Object.entries(typeCounts)) {
console.log(` - ${type}: ${count}`);
}
return relationships;
}
function convertNeo4jValue(value: any): any {
if (value === null || value === undefined) return value;
// Handle Neo4j Integer
if (typeof value === 'object' && value.constructor?.name === 'Integer') {
return value.toNumber();
}
// Handle Neo4j DateTime
if (typeof value === 'object' && value.constructor?.name === 'DateTime') {
return value.toString();
}
// Handle Neo4j Date
if (typeof value === 'object' && value.constructor?.name === 'Date') {
return value.toString();
}
// Handle arrays
if (Array.isArray(value)) {
return value.map(convertNeo4jValue);
}
// Handle Float arrays (embeddings)
if (typeof value === 'object' && value.constructor?.name === 'Float64Array') {
return Array.from(value);
}
return value;
}
async function clearCloud(session: Session): Promise<void> {
console.log('\nπŸ—‘οΈ Clearing cloud database...');
// Delete in batches to avoid memory issues
let deleted = 0;
do {
const result = await session.run(`
MATCH (n)
WITH n LIMIT 1000
DETACH DELETE n
RETURN count(*) as deleted
`);
deleted = result.records[0]?.get('deleted')?.toNumber() || 0;
if (deleted > 0) {
console.log(` Deleted batch of ${deleted} nodes...`);
}
} while (deleted > 0);
console.log(' βœ“ Cloud database cleared');
}
async function applySchema(session: Session, schema: SchemaItem[]): Promise<void> {
console.log('\nπŸ“‹ Applying schema to cloud...');
for (const item of schema) {
try {
await session.run(item.definition);
console.log(` βœ“ ${item.type}: ${item.name}`);
} catch (error: any) {
if (error.message?.includes('already exists') || error.message?.includes('equivalent')) {
console.log(` ⚠ ${item.type} ${item.name} already exists`);
} else {
console.error(` βœ— Failed to create ${item.type} ${item.name}: ${error.message}`);
}
}
}
}
async function importNodes(session: Session, nodes: NodeData[], elementIdMap: Map<string, string>): Promise<void> {
console.log('\nπŸ“¦ Importing nodes to cloud...');
let imported = 0;
const batchSize = 100;
for (let i = 0; i < nodes.length; i += batchSize) {
const batch = nodes.slice(i, i + batchSize);
for (const node of batch) {
try {
// Create node with labels and properties
const labelStr = node.labels.join(':');
// Generate a unique ID if not present
const nodeId = node.properties.id || `node_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
node.properties.id = nodeId;
const query = `
MERGE (n:${labelStr} {id: $nodeId})
SET n = $props
RETURN elementId(n) as newElementId
`;
const result = await session.run(query, {
nodeId,
props: node.properties
});
const newElementId = result.records[0]?.get('newElementId');
if (newElementId) {
elementIdMap.set(node.elementId, newElementId);
}
imported++;
} catch (error: any) {
console.error(` βœ— Failed to import node: ${error.message?.slice(0, 100)}`);
}
}
console.log(` Progress: ${imported}/${nodes.length} nodes`);
}
console.log(` βœ“ Imported ${imported} nodes`);
}
async function importRelationships(
session: Session,
relationships: RelationshipData[],
elementIdMap: Map<string, string>,
nodes: NodeData[]
): Promise<void> {
console.log('\nπŸ”— Importing relationships to cloud...');
// Create a map from old elementId to node id (for relationship matching)
const elementIdToNodeId = new Map<string, string>();
for (const node of nodes) {
elementIdToNodeId.set(node.elementId, node.properties.id);
}
let imported = 0;
let skipped = 0;
for (const rel of relationships) {
try {
const startNodeId = elementIdToNodeId.get(rel.startElementId);
const endNodeId = elementIdToNodeId.get(rel.endElementId);
if (!startNodeId || !endNodeId) {
skipped++;
continue;
}
const query = `
MATCH (a {id: $startId})
MATCH (b {id: $endId})
MERGE (a)-[r:${rel.type}]->(b)
SET r = $props
`;
await session.run(query, {
startId: startNodeId,
endId: endNodeId,
props: rel.properties
});
imported++;
} catch (error: any) {
console.error(` βœ— Failed to import relationship: ${error.message?.slice(0, 100)}`);
}
}
console.log(` βœ“ Imported ${imported} relationships (skipped ${skipped})`);
}
async function syncNeo4jToCloud() {
console.log('═'.repeat(60));
console.log('πŸ”„ Neo4j Local β†’ Cloud Sync');
console.log('═'.repeat(60));
console.log(`\nLocal: ${config.local.uri}`);
console.log(`Cloud: ${config.cloud.uri}`);
console.log('');
let localDriver: Driver | null = null;
let cloudDriver: Driver | null = null;
try {
// Connect to local
console.log('πŸ“‘ Connecting to local Neo4j...');
localDriver = await createDriver(config.local);
await localDriver.verifyConnectivity();
console.log(' βœ“ Connected to local');
// Connect to cloud
console.log('☁️ Connecting to AuraDB cloud...');
cloudDriver = await createDriver(config.cloud);
await cloudDriver.verifyConnectivity();
console.log(' βœ“ Connected to cloud');
// Export from local
const localSession = localDriver.session({ database: config.local.database });
const schema = await exportSchema(localSession);
const nodes = await exportNodes(localSession);
const relationships = await exportRelationships(localSession);
await localSession.close();
// Confirm before clearing cloud
console.log('\n⚠️ This will REPLACE all data in cloud with local data!');
console.log(` Nodes to sync: ${nodes.length}`);
console.log(` Relationships to sync: ${relationships.length}`);
// Import to cloud
const cloudSession = cloudDriver.session({ database: config.cloud.database });
await clearCloud(cloudSession);
await applySchema(cloudSession, schema);
const elementIdMap = new Map<string, string>();
await importNodes(cloudSession, nodes, elementIdMap);
await importRelationships(cloudSession, relationships, elementIdMap, nodes);
await cloudSession.close();
// Verify sync
console.log('\nβœ… Verifying sync...');
const verifySession = cloudDriver.session({ database: config.cloud.database });
const nodeCountResult = await verifySession.run('MATCH (n) RETURN count(n) as count');
const cloudNodeCount = nodeCountResult.records[0]?.get('count')?.toNumber() || 0;
const relCountResult = await verifySession.run('MATCH ()-[r]->() RETURN count(r) as count');
const cloudRelCount = relCountResult.records[0]?.get('count')?.toNumber() || 0;
await verifySession.close();
console.log(` Cloud nodes: ${cloudNodeCount} (expected: ${nodes.length})`);
console.log(` Cloud relationships: ${cloudRelCount} (expected: ${relationships.length})`);
console.log('\n' + '═'.repeat(60));
console.log('βœ… SYNC COMPLETE');
console.log('═'.repeat(60));
} catch (error: any) {
console.error('\n❌ Sync failed:', error.message);
throw error;
} finally {
if (localDriver) await localDriver.close();
if (cloudDriver) await cloudDriver.close();
}
}
// Run
syncNeo4jToCloud()
.then(() => process.exit(0))
.catch(() => process.exit(1));