Spaces:
Sleeping
Sleeping
| import { createClient } from '@supabase/supabase-js'; | |
| import { readFileSync } from 'fs'; | |
| import { fileURLToPath } from 'url'; | |
| import { dirname, join } from 'path'; | |
| import dotenv from 'dotenv'; | |
| // Load environment variables | |
| dotenv.config(); | |
| const __filename = fileURLToPath(import.meta.url); | |
| const __dirname = dirname(__filename); | |
| // Configuration | |
| const SUPABASE_URL = process.env.SUPABASE_URL; | |
| const SUPABASE_SERVICE_ROLE_KEY = process.env.SUPABASE_SERVICE_ROLE_KEY; | |
| if (!SUPABASE_URL || !SUPABASE_SERVICE_ROLE_KEY) { | |
| console.error('β Missing required environment variables:'); | |
| console.error(' - SUPABASE_URL'); | |
| console.error(' - SUPABASE_SERVICE_ROLE_KEY'); | |
| console.error('\nPlease check your .env file and try again.'); | |
| process.exit(1); | |
| } | |
| // Initialize Supabase client with service role key | |
| const supabase = createClient(SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY); | |
| // Configuration | |
| const BATCH_SIZE = 100; | |
| const MAX_RETRIES = 3; | |
| class DataMigrator { | |
| constructor(tableName, taskType) { | |
| this.tableName = tableName; | |
| this.taskType = taskType; | |
| this.stats = { | |
| total: 0, | |
| processed: 0, | |
| inserted: 0, | |
| updated: 0, | |
| skipped: 0, | |
| errors: 0 | |
| }; | |
| } | |
| async loadJsonlFile(filePath) { | |
| try { | |
| console.log(`π Reading JSONL file: ${filePath}`); | |
| const content = readFileSync(filePath, 'utf-8'); | |
| const records = content | |
| .split(/\r?\n/) | |
| .filter(line => line.trim().length > 0) | |
| .map(line => { | |
| try { | |
| return JSON.parse(line); | |
| } catch (parseError) { | |
| console.warn(`β οΈ Skipping invalid JSON line: ${line.substring(0, 50)}...`); | |
| return null; | |
| } | |
| }) | |
| .filter(record => record !== null); | |
| console.log(`β Loaded ${records.length} records from JSONL file`); | |
| return records; | |
| } catch (error) { | |
| console.error(`β Error reading JSONL file: ${error.message}`); | |
| throw error; | |
| } | |
| } | |
| transformRecord(record) { | |
| return { | |
| id: record.id, | |
| task_type: record.task_type, | |
| task_description: record.task_description || null, | |
| rephrased_task: record.rephrased_task || null, | |
| vocabulary: record.vocabulary || null, | |
| response: record.response || null, | |
| response_alternative: record.response_alternative || null, | |
| brainstorm: record.brainstorm || null, | |
| brainstorm_alternative: record.brainstorm_alternative || null, | |
| updated_at: new Date().toISOString() | |
| }; | |
| } | |
| async upsertRecord(record, retryCount = 0) { | |
| try { | |
| const transformedRecord = this.transformRecord(record); | |
| // Use upsert to handle duplicates (insert or update based on id and task_type) | |
| const { data, error, status } = await supabase | |
| .from(this.tableName) | |
| .upsert(transformedRecord, { | |
| onConflict: 'id,task_type', | |
| ignoreDuplicates: false | |
| }) | |
| .select(); | |
| if (error) { | |
| throw error; | |
| } | |
| // Check if this was an insert or update based on the response | |
| if (status === 201) { | |
| this.stats.inserted++; | |
| console.log(`β Inserted record: ${record.id}-${record.task_type}`); | |
| } else if (status === 200) { | |
| this.stats.updated++; | |
| console.log(`π Updated record: ${record.id}-${record.task_type}`); | |
| } else { | |
| this.stats.skipped++; | |
| console.log(`βοΈ Skipped record: ${record.id}-${record.task_type}`); | |
| } | |
| return { success: true, data }; | |
| } catch (error) { | |
| if (retryCount < MAX_RETRIES) { | |
| console.log(`π Retrying record ${record.id}-${record.task_type} (attempt ${retryCount + 1}/${MAX_RETRIES})`); | |
| await new Promise(resolve => setTimeout(resolve, 1000 * (retryCount + 1))); | |
| return this.upsertRecord(record, retryCount + 1); | |
| } | |
| this.stats.errors++; | |
| console.error(`β Failed to process record ${record.id}-${record.task_type}: ${error.message}`); | |
| return { success: false, error: error.message }; | |
| } | |
| } | |
| async processBatch(records, startIndex) { | |
| const endIndex = Math.min(startIndex + BATCH_SIZE, records.length); | |
| const batch = records.slice(startIndex, endIndex); | |
| console.log(`\nπ¦ Processing batch ${Math.floor(startIndex / BATCH_SIZE) + 1}: records ${startIndex + 1}-${endIndex}`); | |
| const promises = batch.map(record => this.upsertRecord(record)); | |
| const results = await Promise.allSettled(promises); | |
| const successful = results.filter(r => r.status === 'fulfilled' && r.value.success).length; | |
| const failed = results.filter(r => r.status === 'rejected' || !r.value.success).length; | |
| console.log(`π Batch completed: ${successful} successful, ${failed} failed`); | |
| return endIndex; | |
| } | |
| async migrateFile(filePath) { | |
| try { | |
| console.log(`π Starting migration for ${this.taskType} tasks...\n`); | |
| const records = await this.loadJsonlFile(filePath); | |
| this.stats.total = records.length; | |
| if (records.length === 0) { | |
| console.log('β οΈ No records to migrate'); | |
| return this.stats; | |
| } | |
| // Process records in batches | |
| let currentIndex = 0; | |
| while (currentIndex < records.length) { | |
| currentIndex = await this.processBatch(records, currentIndex); | |
| this.stats.processed = currentIndex; | |
| // Show progress | |
| const progress = ((currentIndex / records.length) * 100).toFixed(1); | |
| console.log(`π Progress: ${progress}% (${currentIndex}/${records.length})`); | |
| } | |
| console.log('\nβ Migration completed!'); | |
| this.printStats(); | |
| return this.stats; | |
| } catch (error) { | |
| console.error(`β Migration failed: ${error.message}`); | |
| throw error; | |
| } | |
| } | |
| printStats() { | |
| console.log('\nπ Migration Statistics:'); | |
| console.log(` Total records: ${this.stats.total}`); | |
| console.log(` Processed: ${this.stats.processed}`); | |
| console.log(` Inserted: ${this.stats.inserted}`); | |
| console.log(` Updated: ${this.stats.updated}`); | |
| console.log(` Skipped: ${this.stats.skipped}`); | |
| console.log(` Errors: ${this.stats.errors}`); | |
| console.log(` Success rate: ${((this.stats.inserted + this.stats.updated) / this.stats.total * 100).toFixed(1)}%`); | |
| } | |
| async verifyMigration() { | |
| try { | |
| console.log(`\nπ Verifying migration for table: ${this.tableName}`); | |
| const { count, error } = await supabase | |
| .from(this.tableName) | |
| .select('*', { count: 'exact', head: true }); | |
| if (error) { | |
| throw error; | |
| } | |
| console.log(`β Table ${this.tableName} contains ${count} records`); | |
| return count; | |
| } catch (error) { | |
| console.error(`β Verification failed: ${error.message}`); | |
| throw error; | |
| } | |
| } | |
| } | |
| async function main() { | |
| console.log(' Starting data migration process...\n'); | |
| const args = process.argv.slice(2); | |
| // Parse command line arguments | |
| const flags = { | |
| help: args.includes('--help') || args.includes('-h'), | |
| dryRun: args.includes('--dry-run'), | |
| file: null, | |
| type: null | |
| }; | |
| // Parse --file and --type arguments | |
| const fileArg = args.find(arg => arg.startsWith('--file=')); | |
| const typeArg = args.find(arg => arg.startsWith('--type=')); | |
| if (fileArg) flags.file = fileArg.split('=')[1]; | |
| if (typeArg) flags.type = typeArg.split('=')[1]; | |
| // Show help | |
| if (flags.help) { | |
| console.log(` | |
| π Usage: node migrate-data.js [options] | |
| Required: | |
| --file=<path> Path to JSONL file to migrate | |
| --type=<type> Type of tasks: 'speaking' or 'writing' | |
| Options: | |
| --dry-run Show what would be migrated without doing it | |
| --help, -h Show this help message | |
| Examples: | |
| node migrate-data.js --file=data/speaking01.jsonl --type=speaking | |
| node migrate-data.js --file=data/writing.jsonl --type=writing | |
| node migrate-data.js --file=data/speaking01.jsonl --type=speaking --dry-run | |
| `); | |
| process.exit(0); | |
| } | |
| // Validate required arguments | |
| if (!flags.file || !flags.type) { | |
| console.error('β Missing required arguments:'); | |
| if (!flags.file) console.error(' --file=<path> (required)'); | |
| if (!flags.type) console.error(' --type=<speaking|writing> (required)'); | |
| console.error('\nUse --help for usage information.'); | |
| process.exit(1); | |
| } | |
| // Validate type | |
| if (!['speaking', 'writing'].includes(flags.type)) { | |
| console.error('β Invalid type. Must be "speaking" or "writing"'); | |
| process.exit(1); | |
| } | |
| // Check if file exists | |
| try { | |
| readFileSync(flags.file, 'utf-8'); | |
| } catch (error) { | |
| console.error(`β File not found or not readable: ${flags.file}`); | |
| process.exit(1); | |
| } | |
| try { | |
| // Determine table name | |
| const tableName = flags.type === 'speaking' ? 'speaking_tasks' : 'writing_tasks'; | |
| // Create migrator | |
| const migrator = new DataMigrator(tableName, flags.type); | |
| if (flags.dryRun) { | |
| console.log('π DRY RUN MODE - No changes will be made\n'); | |
| const records = await migrator.loadJsonlFile(flags.file); | |
| console.log(`π Would migrate ${records.length} ${flags.type} tasks`); | |
| console.log(' Run without --dry-run to perform the migration'); | |
| } else { | |
| // Perform migration | |
| await migrator.migrateFile(flags.file); | |
| // Verify migration | |
| await migrator.verifyMigration(); | |
| } | |
| console.log('\nπ Migration process completed!'); | |
| } catch (error) { | |
| console.error('\nβ Migration failed:', error.message); | |
| process.exit(1); | |
| } | |
| } | |
| // Run the script | |
| main().catch(console.error); | |