Spaces:
Paused
Paused
| /** | |
| * Neo4j Local to Cloud Sync Script | |
| * | |
| * Syncs data from local Neo4j (bruttopuljen/master) to AuraDB cloud | |
| * Run: npx tsx apps/backend/src/scripts/sync-neo4j-to-cloud.ts | |
| */ | |
| import neo4j, { Driver, Session, Record as Neo4jRecord, isNode, isRelationship } from 'neo4j-driver'; | |
| import dotenv from 'dotenv'; | |
| import path from 'path'; | |
| // Load environment | |
| dotenv.config({ path: path.resolve(process.cwd(), '.env.production') }); | |
| interface SyncConfig { | |
| local: { | |
| uri: string; | |
| user: string; | |
| password: string; | |
| database: string; | |
| }; | |
| cloud: { | |
| uri: string; | |
| user: string; | |
| password: string; | |
| database: string; | |
| }; | |
| } | |
| interface NodeData { | |
| labels: string[]; | |
| properties: Record<string, any>; | |
| elementId: string; | |
| } | |
| interface RelationshipData { | |
| type: string; | |
| properties: Record<string, any>; | |
| startElementId: string; | |
| endElementId: string; | |
| } | |
| interface SchemaItem { | |
| type: 'constraint' | 'index'; | |
| name: string; | |
| definition: string; | |
| } | |
| const config: SyncConfig = { | |
| local: { | |
| uri: 'bolt://localhost:7687', | |
| user: 'neo4j', | |
| password: 'password', // Update if different | |
| database: 'neo4j' | |
| }, | |
| cloud: { | |
| uri: process.env.NEO4J_URI || 'neo4j+s://054eff27.databases.neo4j.io', | |
| user: process.env.NEO4J_USER || 'neo4j', | |
| password: process.env.NEO4J_PASSWORD || '', | |
| database: process.env.NEO4J_DATABASE || 'neo4j' | |
| } | |
| }; | |
| async function createDriver(connectionConfig: SyncConfig['local'] | SyncConfig['cloud']): Promise<Driver> { | |
| return neo4j.driver( | |
| connectionConfig.uri, | |
| neo4j.auth.basic(connectionConfig.user, connectionConfig.password), | |
| { | |
| maxConnectionLifetime: 3 * 60 * 60 * 1000, | |
| connectionAcquisitionTimeout: 2 * 60 * 1000, | |
| } | |
| ); | |
| } | |
| async function exportSchema(session: Session): Promise<SchemaItem[]> { | |
| console.log('π Exporting schema from local...'); | |
| const schema: SchemaItem[] = []; | |
| // Export constraints | |
| const constraintResult = await session.run('SHOW CONSTRAINTS'); | |
| for (const record of constraintResult.records) { | |
| const name = record.get('name'); | |
| const type = record.get('type'); | |
| const entityType = record.get('entityType'); | |
| const labelsOrTypes = record.get('labelsOrTypes'); | |
| const properties = record.get('properties'); | |
| let definition = ''; | |
| if (type === 'UNIQUENESS' && entityType === 'NODE') { | |
| const label = labelsOrTypes?.[0] || 'Unknown'; | |
| const prop = properties?.[0] || 'id'; | |
| definition = `CREATE CONSTRAINT ${name} IF NOT EXISTS FOR (n:${label}) REQUIRE n.${prop} IS UNIQUE`; | |
| } | |
| if (definition) { | |
| schema.push({ type: 'constraint', name, definition }); | |
| console.log(` β Constraint: ${name}`); | |
| } | |
| } | |
| // Export indexes (excluding auto-generated and constraint-backed) | |
| const indexResult = await session.run('SHOW INDEXES'); | |
| for (const record of indexResult.records) { | |
| const name = record.get('name'); | |
| const type = record.get('type'); | |
| const entityType = record.get('entityType'); | |
| const labelsOrTypes = record.get('labelsOrTypes'); | |
| const properties = record.get('properties'); | |
| const owningConstraint = record.get('owningConstraint'); | |
| // Skip constraint-backed indexes | |
| if (owningConstraint) continue; | |
| let definition = ''; | |
| if (type === 'RANGE' && entityType === 'NODE') { | |
| const label = labelsOrTypes?.[0] || 'Unknown'; | |
| const props = properties?.join(', ') || 'id'; | |
| definition = `CREATE INDEX ${name} IF NOT EXISTS FOR (n:${label}) ON (n.${props.split(',')[0].trim()})`; | |
| } else if (type === 'FULLTEXT' && entityType === 'NODE') { | |
| const label = labelsOrTypes?.[0] || 'Unknown'; | |
| const props = (properties || []).map((p: string) => `n.${p}`).join(', '); | |
| definition = `CREATE FULLTEXT INDEX ${name} IF NOT EXISTS FOR (n:${label}) ON EACH [${props}]`; | |
| } else if (type === 'VECTOR') { | |
| const label = labelsOrTypes?.[0] || 'Unknown'; | |
| const prop = properties?.[0] || 'embedding'; | |
| definition = `CREATE VECTOR INDEX ${name} IF NOT EXISTS FOR (n:${label}) ON (n.${prop}) OPTIONS {indexConfig: {\`vector.dimensions\`: 384, \`vector.similarity_function\`: 'cosine'}}`; | |
| } | |
| if (definition) { | |
| schema.push({ type: 'index', name, definition }); | |
| console.log(` β Index: ${name} (${type})`); | |
| } | |
| } | |
| console.log(` Total: ${schema.length} schema items`); | |
| return schema; | |
| } | |
| async function exportNodes(session: Session): Promise<NodeData[]> { | |
| console.log('\nπ¦ Exporting nodes from local...'); | |
| const nodes: NodeData[] = []; | |
| const result = await session.run('MATCH (n) RETURN n, elementId(n) as elementId'); | |
| for (const record of result.records) { | |
| const node = record.get('n'); | |
| const elementId = record.get('elementId'); | |
| if (isNode(node)) { | |
| // Clone properties and convert Neo4j types to JS types | |
| const properties: Record<string, any> = {}; | |
| for (const [key, value] of Object.entries(node.properties)) { | |
| properties[key] = convertNeo4jValue(value); | |
| } | |
| nodes.push({ | |
| labels: [...node.labels], | |
| properties, | |
| elementId | |
| }); | |
| } | |
| } | |
| console.log(` β Exported ${nodes.length} nodes`); | |
| // Show label distribution | |
| const labelCounts: Record<string, number> = {}; | |
| for (const node of nodes) { | |
| for (const label of node.labels) { | |
| labelCounts[label] = (labelCounts[label] || 0) + 1; | |
| } | |
| } | |
| for (const [label, count] of Object.entries(labelCounts)) { | |
| console.log(` - ${label}: ${count}`); | |
| } | |
| return nodes; | |
| } | |
| async function exportRelationships(session: Session): Promise<RelationshipData[]> { | |
| console.log('\nπ Exporting relationships from local...'); | |
| const relationships: RelationshipData[] = []; | |
| const result = await session.run(` | |
| MATCH (a)-[r]->(b) | |
| RETURN r, type(r) as type, elementId(a) as startId, elementId(b) as endId | |
| `); | |
| for (const record of result.records) { | |
| const rel = record.get('r'); | |
| const type = record.get('type'); | |
| const startId = record.get('startId'); | |
| const endId = record.get('endId'); | |
| // Clone properties and convert Neo4j types | |
| const properties: Record<string, any> = {}; | |
| if (isRelationship(rel)) { | |
| for (const [key, value] of Object.entries(rel.properties)) { | |
| properties[key] = convertNeo4jValue(value); | |
| } | |
| } | |
| relationships.push({ | |
| type, | |
| properties, | |
| startElementId: startId, | |
| endElementId: endId | |
| }); | |
| } | |
| console.log(` β Exported ${relationships.length} relationships`); | |
| // Show relationship type distribution | |
| const typeCounts: Record<string, number> = {}; | |
| for (const rel of relationships) { | |
| typeCounts[rel.type] = (typeCounts[rel.type] || 0) + 1; | |
| } | |
| for (const [type, count] of Object.entries(typeCounts)) { | |
| console.log(` - ${type}: ${count}`); | |
| } | |
| return relationships; | |
| } | |
| function convertNeo4jValue(value: any): any { | |
| if (value === null || value === undefined) return value; | |
| // Handle Neo4j Integer | |
| if (typeof value === 'object' && value.constructor?.name === 'Integer') { | |
| return value.toNumber(); | |
| } | |
| // Handle Neo4j DateTime | |
| if (typeof value === 'object' && value.constructor?.name === 'DateTime') { | |
| return value.toString(); | |
| } | |
| // Handle Neo4j Date | |
| if (typeof value === 'object' && value.constructor?.name === 'Date') { | |
| return value.toString(); | |
| } | |
| // Handle arrays | |
| if (Array.isArray(value)) { | |
| return value.map(convertNeo4jValue); | |
| } | |
| // Handle Float arrays (embeddings) | |
| if (typeof value === 'object' && value.constructor?.name === 'Float64Array') { | |
| return Array.from(value); | |
| } | |
| return value; | |
| } | |
| async function clearCloud(session: Session): Promise<void> { | |
| console.log('\nποΈ Clearing cloud database...'); | |
| // Delete in batches to avoid memory issues | |
| let deleted = 0; | |
| do { | |
| const result = await session.run(` | |
| MATCH (n) | |
| WITH n LIMIT 1000 | |
| DETACH DELETE n | |
| RETURN count(*) as deleted | |
| `); | |
| deleted = result.records[0]?.get('deleted')?.toNumber() || 0; | |
| if (deleted > 0) { | |
| console.log(` Deleted batch of ${deleted} nodes...`); | |
| } | |
| } while (deleted > 0); | |
| console.log(' β Cloud database cleared'); | |
| } | |
| async function applySchema(session: Session, schema: SchemaItem[]): Promise<void> { | |
| console.log('\nπ Applying schema to cloud...'); | |
| for (const item of schema) { | |
| try { | |
| await session.run(item.definition); | |
| console.log(` β ${item.type}: ${item.name}`); | |
| } catch (error: any) { | |
| if (error.message?.includes('already exists') || error.message?.includes('equivalent')) { | |
| console.log(` β ${item.type} ${item.name} already exists`); | |
| } else { | |
| console.error(` β Failed to create ${item.type} ${item.name}: ${error.message}`); | |
| } | |
| } | |
| } | |
| } | |
| async function importNodes(session: Session, nodes: NodeData[], elementIdMap: Map<string, string>): Promise<void> { | |
| console.log('\nπ¦ Importing nodes to cloud...'); | |
| let imported = 0; | |
| const batchSize = 100; | |
| for (let i = 0; i < nodes.length; i += batchSize) { | |
| const batch = nodes.slice(i, i + batchSize); | |
| for (const node of batch) { | |
| try { | |
| // Create node with labels and properties | |
| const labelStr = node.labels.join(':'); | |
| // Generate a unique ID if not present | |
| const nodeId = node.properties.id || `node_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`; | |
| node.properties.id = nodeId; | |
| const query = ` | |
| MERGE (n:${labelStr} {id: $nodeId}) | |
| SET n = $props | |
| RETURN elementId(n) as newElementId | |
| `; | |
| const result = await session.run(query, { | |
| nodeId, | |
| props: node.properties | |
| }); | |
| const newElementId = result.records[0]?.get('newElementId'); | |
| if (newElementId) { | |
| elementIdMap.set(node.elementId, newElementId); | |
| } | |
| imported++; | |
| } catch (error: any) { | |
| console.error(` β Failed to import node: ${error.message?.slice(0, 100)}`); | |
| } | |
| } | |
| console.log(` Progress: ${imported}/${nodes.length} nodes`); | |
| } | |
| console.log(` β Imported ${imported} nodes`); | |
| } | |
| async function importRelationships( | |
| session: Session, | |
| relationships: RelationshipData[], | |
| elementIdMap: Map<string, string>, | |
| nodes: NodeData[] | |
| ): Promise<void> { | |
| console.log('\nπ Importing relationships to cloud...'); | |
| // Create a map from old elementId to node id (for relationship matching) | |
| const elementIdToNodeId = new Map<string, string>(); | |
| for (const node of nodes) { | |
| elementIdToNodeId.set(node.elementId, node.properties.id); | |
| } | |
| let imported = 0; | |
| let skipped = 0; | |
| for (const rel of relationships) { | |
| try { | |
| const startNodeId = elementIdToNodeId.get(rel.startElementId); | |
| const endNodeId = elementIdToNodeId.get(rel.endElementId); | |
| if (!startNodeId || !endNodeId) { | |
| skipped++; | |
| continue; | |
| } | |
| const query = ` | |
| MATCH (a {id: $startId}) | |
| MATCH (b {id: $endId}) | |
| MERGE (a)-[r:${rel.type}]->(b) | |
| SET r = $props | |
| `; | |
| await session.run(query, { | |
| startId: startNodeId, | |
| endId: endNodeId, | |
| props: rel.properties | |
| }); | |
| imported++; | |
| } catch (error: any) { | |
| console.error(` β Failed to import relationship: ${error.message?.slice(0, 100)}`); | |
| } | |
| } | |
| console.log(` β Imported ${imported} relationships (skipped ${skipped})`); | |
| } | |
| async function syncNeo4jToCloud() { | |
| console.log('β'.repeat(60)); | |
| console.log('π Neo4j Local β Cloud Sync'); | |
| console.log('β'.repeat(60)); | |
| console.log(`\nLocal: ${config.local.uri}`); | |
| console.log(`Cloud: ${config.cloud.uri}`); | |
| console.log(''); | |
| let localDriver: Driver | null = null; | |
| let cloudDriver: Driver | null = null; | |
| try { | |
| // Connect to local | |
| console.log('π‘ Connecting to local Neo4j...'); | |
| localDriver = await createDriver(config.local); | |
| await localDriver.verifyConnectivity(); | |
| console.log(' β Connected to local'); | |
| // Connect to cloud | |
| console.log('βοΈ Connecting to AuraDB cloud...'); | |
| cloudDriver = await createDriver(config.cloud); | |
| await cloudDriver.verifyConnectivity(); | |
| console.log(' β Connected to cloud'); | |
| // Export from local | |
| const localSession = localDriver.session({ database: config.local.database }); | |
| const schema = await exportSchema(localSession); | |
| const nodes = await exportNodes(localSession); | |
| const relationships = await exportRelationships(localSession); | |
| await localSession.close(); | |
| // Confirm before clearing cloud | |
| console.log('\nβ οΈ This will REPLACE all data in cloud with local data!'); | |
| console.log(` Nodes to sync: ${nodes.length}`); | |
| console.log(` Relationships to sync: ${relationships.length}`); | |
| // Import to cloud | |
| const cloudSession = cloudDriver.session({ database: config.cloud.database }); | |
| await clearCloud(cloudSession); | |
| await applySchema(cloudSession, schema); | |
| const elementIdMap = new Map<string, string>(); | |
| await importNodes(cloudSession, nodes, elementIdMap); | |
| await importRelationships(cloudSession, relationships, elementIdMap, nodes); | |
| await cloudSession.close(); | |
| // Verify sync | |
| console.log('\nβ Verifying sync...'); | |
| const verifySession = cloudDriver.session({ database: config.cloud.database }); | |
| const nodeCountResult = await verifySession.run('MATCH (n) RETURN count(n) as count'); | |
| const cloudNodeCount = nodeCountResult.records[0]?.get('count')?.toNumber() || 0; | |
| const relCountResult = await verifySession.run('MATCH ()-[r]->() RETURN count(r) as count'); | |
| const cloudRelCount = relCountResult.records[0]?.get('count')?.toNumber() || 0; | |
| await verifySession.close(); | |
| console.log(` Cloud nodes: ${cloudNodeCount} (expected: ${nodes.length})`); | |
| console.log(` Cloud relationships: ${cloudRelCount} (expected: ${relationships.length})`); | |
| console.log('\n' + 'β'.repeat(60)); | |
| console.log('β SYNC COMPLETE'); | |
| console.log('β'.repeat(60)); | |
| } catch (error: any) { | |
| console.error('\nβ Sync failed:', error.message); | |
| throw error; | |
| } finally { | |
| if (localDriver) await localDriver.close(); | |
| if (cloudDriver) await cloudDriver.close(); | |
| } | |
| } | |
| // Run | |
| syncNeo4jToCloud() | |
| .then(() => process.exit(0)) | |
| .catch(() => process.exit(1)); | |