melodix-api / migrate_multi_groups.py
GitHub Action
deploy from github actions
440bac0
Raw
History Blame Contribute Delete
2.76 kB
import os
import sys
from dotenv import load_dotenv
# Add current directory to path
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from sqlalchemy import create_engine, text
from app.database import engine, Base
# Import models to ensure they are registered in Base.metadata
from app.models.lyrics import Group, UserGroup, Lyric, lyric_groups, task_groups
from app.models.process import Task
from app.database import SessionLocal
def run_migration():
print("Starting migration...")
# 1. Create tables lyric_groups and task_groups if they do not exist
print("Creating tables if they do not exist...")
Base.metadata.create_all(bind=engine)
print("Tables created successfully.")
db = SessionLocal()
try:
# 2. Migrate existing group_id columns from lyrics to lyric_groups
print("Migrating lyric groups...")
lyrics_with_group = db.query(Lyric).filter(Lyric.group_id.isnot(None)).all()
migrated_lyrics = 0
for lyric in lyrics_with_group:
# Check if association already exists
exists = db.execute(
text("SELECT 1 FROM lyric_groups WHERE lyric_id = :lyric_id AND group_id = :group_id"),
{"lyric_id": lyric.id, "group_id": lyric.group_id}
).fetchone()
if not exists:
db.execute(
text("INSERT INTO lyric_groups (lyric_id, group_id) VALUES (:lyric_id, :group_id)"),
{"lyric_id": lyric.id, "group_id": lyric.group_id}
)
migrated_lyrics += 1
# 3. Migrate existing group_id columns from tasks to task_groups
print("Migrating task groups...")
tasks_with_group = db.query(Task).filter(Task.group_id.isnot(None)).all()
migrated_tasks = 0
for task in tasks_with_group:
# Check if association already exists
exists = db.execute(
text("SELECT 1 FROM task_groups WHERE task_id = :task_id AND group_id = :group_id"),
{"task_id": task.id, "group_id": task.group_id}
).fetchone()
if not exists:
db.execute(
text("INSERT INTO task_groups (task_id, group_id) VALUES (:task_id, :group_id)"),
{"task_id": task.id, "group_id": task.group_id}
)
migrated_tasks += 1
db.commit()
print(f"Migration completed. Migrated {migrated_lyrics} lyrics and {migrated_tasks} tasks.")
except Exception as e:
db.rollback()
print(f"Error during migration: {e}")
raise e
finally:
db.close()
if __name__ == "__main__":
run_migration()