brainstorm-ui / scripts /migrate-data.js
hsila's picture
Migrate from JSONL files to Supabase database with secure RLS policies
2451d08
#!/usr/bin/env node
import { createClient } from '@supabase/supabase-js';
import { readFileSync } from 'fs';
import { fileURLToPath } from 'url';
import { dirname, join } from 'path';
import dotenv from 'dotenv';
// Load environment variables
dotenv.config();
const __filename = fileURLToPath(import.meta.url);
const __dirname = dirname(__filename);
// Configuration
const SUPABASE_URL = process.env.SUPABASE_URL;
const SUPABASE_SERVICE_ROLE_KEY = process.env.SUPABASE_SERVICE_ROLE_KEY;
if (!SUPABASE_URL || !SUPABASE_SERVICE_ROLE_KEY) {
console.error('❌ Missing required environment variables:');
console.error(' - SUPABASE_URL');
console.error(' - SUPABASE_SERVICE_ROLE_KEY');
console.error('\nPlease check your .env file and try again.');
process.exit(1);
}
// Initialize Supabase client with service role key
const supabase = createClient(SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY);
// Configuration
const BATCH_SIZE = 100;
const MAX_RETRIES = 3;
class DataMigrator {
constructor(tableName, taskType) {
this.tableName = tableName;
this.taskType = taskType;
this.stats = {
total: 0,
processed: 0,
inserted: 0,
updated: 0,
skipped: 0,
errors: 0
};
}
async loadJsonlFile(filePath) {
try {
console.log(`πŸ“„ Reading JSONL file: ${filePath}`);
const content = readFileSync(filePath, 'utf-8');
const records = content
.split(/\r?\n/)
.filter(line => line.trim().length > 0)
.map(line => {
try {
return JSON.parse(line);
} catch (parseError) {
console.warn(`⚠️ Skipping invalid JSON line: ${line.substring(0, 50)}...`);
return null;
}
})
.filter(record => record !== null);
console.log(`βœ… Loaded ${records.length} records from JSONL file`);
return records;
} catch (error) {
console.error(`❌ Error reading JSONL file: ${error.message}`);
throw error;
}
}
transformRecord(record) {
return {
id: record.id,
task_type: record.task_type,
task_description: record.task_description || null,
rephrased_task: record.rephrased_task || null,
vocabulary: record.vocabulary || null,
response: record.response || null,
response_alternative: record.response_alternative || null,
brainstorm: record.brainstorm || null,
brainstorm_alternative: record.brainstorm_alternative || null,
updated_at: new Date().toISOString()
};
}
async upsertRecord(record, retryCount = 0) {
try {
const transformedRecord = this.transformRecord(record);
// Use upsert to handle duplicates (insert or update based on id and task_type)
const { data, error, status } = await supabase
.from(this.tableName)
.upsert(transformedRecord, {
onConflict: 'id,task_type',
ignoreDuplicates: false
})
.select();
if (error) {
throw error;
}
// Check if this was an insert or update based on the response
if (status === 201) {
this.stats.inserted++;
console.log(`βœ… Inserted record: ${record.id}-${record.task_type}`);
} else if (status === 200) {
this.stats.updated++;
console.log(`πŸ”„ Updated record: ${record.id}-${record.task_type}`);
} else {
this.stats.skipped++;
console.log(`⏭️ Skipped record: ${record.id}-${record.task_type}`);
}
return { success: true, data };
} catch (error) {
if (retryCount < MAX_RETRIES) {
console.log(`πŸ”„ Retrying record ${record.id}-${record.task_type} (attempt ${retryCount + 1}/${MAX_RETRIES})`);
await new Promise(resolve => setTimeout(resolve, 1000 * (retryCount + 1)));
return this.upsertRecord(record, retryCount + 1);
}
this.stats.errors++;
console.error(`❌ Failed to process record ${record.id}-${record.task_type}: ${error.message}`);
return { success: false, error: error.message };
}
}
async processBatch(records, startIndex) {
const endIndex = Math.min(startIndex + BATCH_SIZE, records.length);
const batch = records.slice(startIndex, endIndex);
console.log(`\nπŸ“¦ Processing batch ${Math.floor(startIndex / BATCH_SIZE) + 1}: records ${startIndex + 1}-${endIndex}`);
const promises = batch.map(record => this.upsertRecord(record));
const results = await Promise.allSettled(promises);
const successful = results.filter(r => r.status === 'fulfilled' && r.value.success).length;
const failed = results.filter(r => r.status === 'rejected' || !r.value.success).length;
console.log(`πŸ“Š Batch completed: ${successful} successful, ${failed} failed`);
return endIndex;
}
async migrateFile(filePath) {
try {
console.log(`πŸš€ Starting migration for ${this.taskType} tasks...\n`);
const records = await this.loadJsonlFile(filePath);
this.stats.total = records.length;
if (records.length === 0) {
console.log('⚠️ No records to migrate');
return this.stats;
}
// Process records in batches
let currentIndex = 0;
while (currentIndex < records.length) {
currentIndex = await this.processBatch(records, currentIndex);
this.stats.processed = currentIndex;
// Show progress
const progress = ((currentIndex / records.length) * 100).toFixed(1);
console.log(`πŸ“ˆ Progress: ${progress}% (${currentIndex}/${records.length})`);
}
console.log('\nβœ… Migration completed!');
this.printStats();
return this.stats;
} catch (error) {
console.error(`❌ Migration failed: ${error.message}`);
throw error;
}
}
printStats() {
console.log('\nπŸ“Š Migration Statistics:');
console.log(` Total records: ${this.stats.total}`);
console.log(` Processed: ${this.stats.processed}`);
console.log(` Inserted: ${this.stats.inserted}`);
console.log(` Updated: ${this.stats.updated}`);
console.log(` Skipped: ${this.stats.skipped}`);
console.log(` Errors: ${this.stats.errors}`);
console.log(` Success rate: ${((this.stats.inserted + this.stats.updated) / this.stats.total * 100).toFixed(1)}%`);
}
async verifyMigration() {
try {
console.log(`\nπŸ” Verifying migration for table: ${this.tableName}`);
const { count, error } = await supabase
.from(this.tableName)
.select('*', { count: 'exact', head: true });
if (error) {
throw error;
}
console.log(`βœ… Table ${this.tableName} contains ${count} records`);
return count;
} catch (error) {
console.error(`❌ Verification failed: ${error.message}`);
throw error;
}
}
}
async function main() {
console.log(' Starting data migration process...\n');
const args = process.argv.slice(2);
// Parse command line arguments
const flags = {
help: args.includes('--help') || args.includes('-h'),
dryRun: args.includes('--dry-run'),
file: null,
type: null
};
// Parse --file and --type arguments
const fileArg = args.find(arg => arg.startsWith('--file='));
const typeArg = args.find(arg => arg.startsWith('--type='));
if (fileArg) flags.file = fileArg.split('=')[1];
if (typeArg) flags.type = typeArg.split('=')[1];
// Show help
if (flags.help) {
console.log(`
πŸ“‹ Usage: node migrate-data.js [options]
Required:
--file=<path> Path to JSONL file to migrate
--type=<type> Type of tasks: 'speaking' or 'writing'
Options:
--dry-run Show what would be migrated without doing it
--help, -h Show this help message
Examples:
node migrate-data.js --file=data/speaking01.jsonl --type=speaking
node migrate-data.js --file=data/writing.jsonl --type=writing
node migrate-data.js --file=data/speaking01.jsonl --type=speaking --dry-run
`);
process.exit(0);
}
// Validate required arguments
if (!flags.file || !flags.type) {
console.error('❌ Missing required arguments:');
if (!flags.file) console.error(' --file=<path> (required)');
if (!flags.type) console.error(' --type=<speaking|writing> (required)');
console.error('\nUse --help for usage information.');
process.exit(1);
}
// Validate type
if (!['speaking', 'writing'].includes(flags.type)) {
console.error('❌ Invalid type. Must be "speaking" or "writing"');
process.exit(1);
}
// Check if file exists
try {
readFileSync(flags.file, 'utf-8');
} catch (error) {
console.error(`❌ File not found or not readable: ${flags.file}`);
process.exit(1);
}
try {
// Determine table name
const tableName = flags.type === 'speaking' ? 'speaking_tasks' : 'writing_tasks';
// Create migrator
const migrator = new DataMigrator(tableName, flags.type);
if (flags.dryRun) {
console.log('πŸ” DRY RUN MODE - No changes will be made\n');
const records = await migrator.loadJsonlFile(flags.file);
console.log(`πŸ“Š Would migrate ${records.length} ${flags.type} tasks`);
console.log(' Run without --dry-run to perform the migration');
} else {
// Perform migration
await migrator.migrateFile(flags.file);
// Verify migration
await migrator.verifyMigration();
}
console.log('\nπŸŽ‰ Migration process completed!');
} catch (error) {
console.error('\n❌ Migration failed:', error.message);
process.exit(1);
}
}
// Run the script
main().catch(console.error);