| |
| """数据库迁移脚本 - 为用户表添加单位、团队、物种字段 |
| |
| 确保用户表包含以下新字段: |
| 1. organization (VARCHAR) - 单位(崖州湾实验室、之江实验室) |
| 2. team (VARCHAR) - 团队 |
| 3. species (VARCHAR) - 物种 |
| |
| 这个脚本可以安全地多次运行(幂等性)。 |
| |
| 主要变更: |
| 1. 为 users 表添加 organization 列(如果不存在) |
| 2. 为 users 表添加 team 列(如果不存在) |
| 3. 为 users 表添加 species 列(如果不存在) |
| """ |
|
|
| import sys |
| from pathlib import Path |
|
|
| |
| project_root = Path(__file__).parent.parent |
| sys.path.insert(0, str(project_root)) |
|
|
| |
| import logging |
| import shutil |
|
|
| from sqlalchemy import inspect, text |
| from sqlalchemy.engine import Engine |
|
|
| from qa_annotate.database.base import DB_PATH, engine |
|
|
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format="%(asctime)s - %(levelname)s - %(message)s", |
| datefmt="%Y-%m-%d %H:%M:%S", |
| ) |
| logger = logging.getLogger(__name__) |
|
|
|
|
| def backup_database(): |
| """备份数据库(SQLite)""" |
| from datetime import datetime |
|
|
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| backup_path = DB_PATH.parent / f"annotations_backup_{timestamp}.db" |
| shutil.copy2(DB_PATH, backup_path) |
| logger.info(f"SQLite 数据库已备份到: {backup_path}") |
| return backup_path |
|
|
|
|
| def disable_foreign_keys(conn): |
| """禁用外键约束""" |
| conn.execute(text("PRAGMA foreign_keys = OFF")) |
| logger.info("已禁用外键约束") |
|
|
|
|
| def enable_foreign_keys(conn): |
| """启用外键约束""" |
| conn.execute(text("PRAGMA foreign_keys = ON")) |
| logger.info("已启用外键约束") |
|
|
|
|
| def restore_from_backup(backup_path: Path): |
| """从备份恢复数据库(SQLite)""" |
| logger.warning("=" * 60) |
| logger.warning("开始从备份恢复数据库...") |
| logger.warning(f"备份路径: {backup_path}") |
| logger.warning("=" * 60) |
|
|
| if not backup_path.exists(): |
| logger.error(f"备份文件不存在: {backup_path}") |
| return False |
|
|
| try: |
| shutil.copy2(backup_path, DB_PATH) |
| logger.info("SQLite 数据库已从备份恢复") |
| return True |
| except Exception as e: |
| logger.error(f"恢复数据库失败: {e}") |
| return False |
|
|
|
|
| def table_exists(inspector: inspect, table_name: str) -> bool: |
| """检查表是否存在""" |
| return table_name in inspector.get_table_names() |
|
|
|
|
| def column_exists(inspector: inspect, table_name: str, column_name: str) -> bool: |
| """检查列是否存在""" |
| if not table_exists(inspector, table_name): |
| return False |
| columns = inspector.get_columns(table_name) |
| return any(col["name"] == column_name for col in columns) |
|
|
|
|
| def add_user_fields(engine: Engine): |
| """为用户表添加新字段(organization、team、species)""" |
| logger.info("检查 users 表的新字段...") |
|
|
| inspector = inspect(engine) |
|
|
| if not table_exists(inspector, "users"): |
| logger.error("users 表不存在,无法添加字段") |
| return False |
|
|
| changes_made = False |
|
|
| with engine.begin() as conn: |
| disable_foreign_keys(conn) |
|
|
| |
| if not column_exists(inspector, "users", "organization"): |
| logger.info("添加 organization 列...") |
| try: |
| conn.execute( |
| text(""" |
| ALTER TABLE users |
| ADD COLUMN organization VARCHAR |
| """) |
| ) |
| logger.info("organization 列已添加") |
| changes_made = True |
| except Exception as e: |
| logger.warning(f"无法通过 ALTER TABLE 添加 organization 列: {e}") |
| logger.error("添加 organization 列失败") |
| else: |
| logger.info("organization 列已存在,跳过") |
|
|
| |
| if not column_exists(inspector, "users", "team"): |
| logger.info("添加 team 列...") |
| try: |
| conn.execute( |
| text(""" |
| ALTER TABLE users |
| ADD COLUMN team VARCHAR |
| """) |
| ) |
| logger.info("team 列已添加") |
| changes_made = True |
| except Exception as e: |
| logger.warning(f"无法通过 ALTER TABLE 添加 team 列: {e}") |
| logger.error("添加 team 列失败") |
| else: |
| logger.info("team 列已存在,跳过") |
|
|
| |
| if not column_exists(inspector, "users", "species"): |
| logger.info("添加 species 列...") |
| try: |
| conn.execute( |
| text(""" |
| ALTER TABLE users |
| ADD COLUMN species VARCHAR |
| """) |
| ) |
| logger.info("species 列已添加") |
| changes_made = True |
| except Exception as e: |
| logger.warning(f"无法通过 ALTER TABLE 添加 species 列: {e}") |
| logger.error("添加 species 列失败") |
| else: |
| logger.info("species 列已存在,跳过") |
|
|
| enable_foreign_keys(conn) |
|
|
| return changes_made |
|
|
|
|
| def validate_migration(engine: Engine): |
| """验证迁移结果""" |
| logger.info("验证迁移结果...") |
|
|
| inspector = inspect(engine) |
| errors = [] |
|
|
| |
| if not table_exists(inspector, "users"): |
| errors.append("users 表不存在") |
| return False, errors |
|
|
| logger.info("✓ users 表存在") |
|
|
| |
| required_columns = ["organization", "team", "species"] |
| columns = {col["name"] for col in inspector.get_columns("users")} |
|
|
| for col in required_columns: |
| if col not in columns: |
| errors.append(f"users 表缺少列: {col}") |
| else: |
| logger.info(f" ✓ users 表有 {col} 列") |
|
|
| if errors: |
| logger.error("验证失败:") |
| for error in errors: |
| logger.error(f" - {error}") |
| return False, errors |
|
|
| logger.info("验证通过") |
| return True, [] |
|
|
|
|
| def main(): |
| """主函数""" |
| logger.info("=" * 60) |
| logger.info("开始数据库迁移 - 为用户表添加单位、团队、物种字段") |
| logger.info(f"数据库路径: {DB_PATH}") |
| logger.info("=" * 60) |
|
|
| |
| if not DB_PATH.exists(): |
| logger.error(f"数据库文件不存在: {DB_PATH}") |
| logger.info("如果这是新安装,请先运行应用以初始化数据库") |
| sys.exit(1) |
|
|
| |
| logger.info("备份数据库...") |
| backup_path = backup_database() |
| logger.info(f"备份完成: {backup_path}") |
|
|
| migration_success = False |
| changes_made = False |
|
|
| try: |
| |
| if add_user_fields(engine): |
| changes_made = True |
|
|
| if not changes_made: |
| logger.info("=" * 60) |
| logger.info("所有字段已存在,无需迁移") |
| logger.info("=" * 60) |
| return |
|
|
| |
| logger.info("=" * 60) |
| logger.info("开始验证迁移结果...") |
| logger.info("=" * 60) |
|
|
| is_valid, errors = validate_migration(engine) |
|
|
| if not is_valid: |
| logger.error("=" * 60) |
| logger.error("迁移验证失败!") |
| logger.error("=" * 60) |
| logger.error("错误详情:") |
| for error in errors: |
| logger.error(f" - {error}") |
| logger.error("=" * 60) |
| logger.error("开始回退到备份...") |
| logger.error("=" * 60) |
|
|
| if restore_from_backup(backup_path): |
| logger.error("已成功回退到备份") |
| else: |
| logger.error("回退失败,请手动恢复数据库") |
| sys.exit(1) |
|
|
| migration_success = True |
|
|
| logger.info("=" * 60) |
| logger.info("数据库迁移完成!") |
| logger.info("验证通过!") |
| logger.info("=" * 60) |
|
|
| |
| logger.info("=" * 60) |
| logger.info("迁移已完成,请确认是否接受此次迁移") |
| logger.info(f"备份文件位置: {backup_path}") |
| logger.info("=" * 60) |
|
|
| try: |
| user_input = input("确认接受迁移?(y/n): ").strip().lower() |
| if user_input in ("y", "yes"): |
| logger.info("=" * 60) |
| logger.info("用户已确认,迁移完成!") |
| logger.info("=" * 60) |
| else: |
| logger.warning("=" * 60) |
| logger.warning("用户已取消,开始回退到备份...") |
| logger.warning("=" * 60) |
|
|
| if restore_from_backup(backup_path): |
| logger.warning("已成功回退到备份") |
| logger.warning("迁移已取消") |
| else: |
| logger.error("回退失败,请手动恢复数据库") |
| sys.exit(0) |
|
|
| except (KeyboardInterrupt, EOFError): |
| logger.warning("") |
| logger.warning("=" * 60) |
| logger.warning("用户中断操作,开始回退到备份...") |
| logger.warning("=" * 60) |
|
|
| if restore_from_backup(backup_path): |
| logger.warning("已成功回退到备份") |
| logger.warning("迁移已取消") |
| else: |
| logger.error("回退失败,请手动恢复数据库") |
| sys.exit(0) |
|
|
| except Exception as e: |
| logger.error("=" * 60) |
| logger.error(f"迁移过程中发生异常: {e}", exc_info=True) |
| logger.error("=" * 60) |
|
|
| if not migration_success: |
| logger.error("开始回退到备份...") |
| if restore_from_backup(backup_path): |
| logger.error("已成功回退到备份") |
| else: |
| logger.error("回退失败,请手动恢复数据库") |
| sys.exit(1) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|