| import { Database } from "$lib/server/database"; |
| import { migrations } from "./routines"; |
| import { acquireLock, releaseLock, isDBLocked, refreshLock } from "./lock"; |
| import { Semaphores } from "$lib/types/Semaphore"; |
| import { logger } from "$lib/server/logger"; |
| import { config } from "$lib/server/config"; |
|
|
| export async function checkAndRunMigrations() { |
| |
| if (new Set(migrations.map((m) => m._id.toString())).size !== migrations.length) { |
| throw new Error("Duplicate migration GUIDs found."); |
| } |
|
|
| |
| const migrationResults = await (await Database.getInstance()) |
| .getCollections() |
| .migrationResults.find() |
| .toArray(); |
|
|
| logger.debug("[MIGRATIONS] Begin check..."); |
|
|
| |
| const connectedClient = await (await Database.getInstance()).getClient().connect(); |
|
|
| const lockId = await acquireLock(Semaphores.MIGRATION); |
|
|
| if (!lockId) { |
| |
| logger.debug( |
| "[MIGRATIONS] Another instance already has the lock. Waiting for DB to be unlocked." |
| ); |
|
|
| |
| |
| while (await isDBLocked(Semaphores.MIGRATION)) { |
| await new Promise((resolve) => setTimeout(resolve, 1000)); |
| } |
| return; |
| } |
|
|
| |
| |
| const refreshInterval = setInterval(async () => { |
| await refreshLock(Semaphores.MIGRATION, lockId); |
| }, 1000 * 10); |
|
|
| |
| for (const migration of migrations) { |
| |
| const shouldRun = |
| migration.runEveryTime || |
| !migrationResults.find((m) => m._id.toString() === migration._id.toString()); |
|
|
| |
| if (!shouldRun) { |
| logger.debug(`[MIGRATIONS] "${migration.name}" already applied. Skipping...`); |
| } else { |
| |
| if ( |
| (migration.runForHuggingChat === "only" && !config.isHuggingChat) || |
| (migration.runForHuggingChat === "never" && config.isHuggingChat) |
| ) { |
| logger.debug( |
| `[MIGRATIONS] "${migration.name}" should not be applied for this run. Skipping...` |
| ); |
| continue; |
| } |
|
|
| |
| logger.debug( |
| `[MIGRATIONS] "${migration.name}" ${ |
| migration.runEveryTime ? "should run every time" : "not applied yet" |
| }. Applying...` |
| ); |
|
|
| await (await Database.getInstance()).getCollections().migrationResults.updateOne( |
| { _id: migration._id }, |
| { |
| $set: { |
| name: migration.name, |
| status: "ongoing", |
| }, |
| }, |
| { upsert: true } |
| ); |
|
|
| const session = connectedClient.startSession(); |
| let result = false; |
|
|
| try { |
| await session.withTransaction(async () => { |
| result = await migration.up(await Database.getInstance()); |
| }); |
| } catch (e) { |
| logger.debug(`[MIGRATIONS] "${migration.name}" failed!`); |
| logger.error(e); |
| } finally { |
| await session.endSession(); |
| } |
|
|
| await (await Database.getInstance()).getCollections().migrationResults.updateOne( |
| { _id: migration._id }, |
| { |
| $set: { |
| name: migration.name, |
| status: result ? "success" : "failure", |
| }, |
| }, |
| { upsert: true } |
| ); |
| } |
| } |
|
|
| logger.debug("[MIGRATIONS] All migrations applied. Releasing lock"); |
|
|
| clearInterval(refreshInterval); |
| await releaseLock(Semaphores.MIGRATION, lockId); |
| } |
|
|