Spaces:
Running
Running
| import os | |
| import sys | |
| from dotenv import load_dotenv | |
| # Add current directory to path | |
| sys.path.append(os.path.dirname(os.path.abspath(__file__))) | |
| from sqlalchemy import create_engine, text | |
| from app.database import engine, Base | |
| # Import models to ensure they are registered in Base.metadata | |
| from app.models.lyrics import Group, UserGroup, Lyric, lyric_groups, task_groups | |
| from app.models.process import Task | |
| from app.database import SessionLocal | |
| def run_migration(): | |
| print("Starting migration...") | |
| # 1. Create tables lyric_groups and task_groups if they do not exist | |
| print("Creating tables if they do not exist...") | |
| Base.metadata.create_all(bind=engine) | |
| print("Tables created successfully.") | |
| db = SessionLocal() | |
| try: | |
| # 2. Migrate existing group_id columns from lyrics to lyric_groups | |
| print("Migrating lyric groups...") | |
| lyrics_with_group = db.query(Lyric).filter(Lyric.group_id.isnot(None)).all() | |
| migrated_lyrics = 0 | |
| for lyric in lyrics_with_group: | |
| # Check if association already exists | |
| exists = db.execute( | |
| text("SELECT 1 FROM lyric_groups WHERE lyric_id = :lyric_id AND group_id = :group_id"), | |
| {"lyric_id": lyric.id, "group_id": lyric.group_id} | |
| ).fetchone() | |
| if not exists: | |
| db.execute( | |
| text("INSERT INTO lyric_groups (lyric_id, group_id) VALUES (:lyric_id, :group_id)"), | |
| {"lyric_id": lyric.id, "group_id": lyric.group_id} | |
| ) | |
| migrated_lyrics += 1 | |
| # 3. Migrate existing group_id columns from tasks to task_groups | |
| print("Migrating task groups...") | |
| tasks_with_group = db.query(Task).filter(Task.group_id.isnot(None)).all() | |
| migrated_tasks = 0 | |
| for task in tasks_with_group: | |
| # Check if association already exists | |
| exists = db.execute( | |
| text("SELECT 1 FROM task_groups WHERE task_id = :task_id AND group_id = :group_id"), | |
| {"task_id": task.id, "group_id": task.group_id} | |
| ).fetchone() | |
| if not exists: | |
| db.execute( | |
| text("INSERT INTO task_groups (task_id, group_id) VALUES (:task_id, :group_id)"), | |
| {"task_id": task.id, "group_id": task.group_id} | |
| ) | |
| migrated_tasks += 1 | |
| db.commit() | |
| print(f"Migration completed. Migrated {migrated_lyrics} lyrics and {migrated_tasks} tasks.") | |
| except Exception as e: | |
| db.rollback() | |
| print(f"Error during migration: {e}") | |
| raise e | |
| finally: | |
| db.close() | |
| if __name__ == "__main__": | |
| run_migration() | |