gemini-business2api-github / scripts /migrate_to_database.py
lijunke
deploy: clean start with hf metadata
18081cf
#!/usr/bin/env python3
"""
数据库迁移脚本
用途:将数据从 kv_store 或本地文件迁移到新的数据库表结构
迁移内容:
- accounts (账户配置)
- settings (系统设置)
- stats (统计数据)
使用方法:
python scripts/migrate_to_database.py
迁移后:
- kv_store 数据保留(作为备份,仅 PostgreSQL)
- 本地文件重命名为 .migrated_YYYYMMDD-HHMMSS(防止重复迁移)
支持的数据库:
- PostgreSQL:配置 DATABASE_URL 环境变量
- SQLite:不配置 DATABASE_URL,自动使用 data/data.db
"""
import asyncio
import json
import os
import sqlite3
import sys
import time
from pathlib import Path
# 添加项目根目录到路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from dotenv import load_dotenv
load_dotenv()
def rename_migrated_file(file_path: str) -> str:
"""重命名已迁移的文件,添加时间戳后缀"""
if not os.path.exists(file_path):
return None
timestamp = time.strftime("%Y%m%d-%H%M%S")
new_path = f"{file_path}.migrated_{timestamp}"
os.rename(file_path, new_path)
return new_path
async def migrate_from_kv_store(conn):
"""从 kv_store 迁移数据到新表"""
print("\n" + "=" * 60)
print("从 kv_store 迁移数据")
print("=" * 60)
# 检查 kv_store 是否存在
exists = await conn.fetchval(
"""
SELECT EXISTS(
SELECT 1 FROM information_schema.tables
WHERE table_schema='public' AND table_name='kv_store'
)
"""
)
if not exists:
print("❌ kv_store 表不存在,跳过")
return False
migrated_any = False
# 1. 迁移 accounts
print("\n### 迁移 accounts ###")
row = await conn.fetchrow("SELECT value FROM kv_store WHERE key = $1", "accounts")
if row:
value = row["value"]
if isinstance(value, str):
value = json.loads(value)
if isinstance(value, list) and len(value) > 0:
# 检查新表是否已有数据
count = await conn.fetchval("SELECT COUNT(*) FROM accounts")
if count > 0:
print(f"⚠️ accounts 表已有 {count} 条记录")
confirm = input("是否覆盖?(yes/no): ").strip().lower()
if confirm not in ("yes", "y"):
print("跳过 accounts 迁移")
else:
# 清空并迁移
await conn.execute("DELETE FROM accounts")
for index, acc in enumerate(value, 1):
account_id = acc.get("id") or f"account_{index}"
await conn.execute(
"""
INSERT INTO accounts (account_id, position, data, updated_at)
VALUES ($1, $2, $3, CURRENT_TIMESTAMP)
""",
account_id,
index,
json.dumps(acc, ensure_ascii=False)
)
print(f"✅ 成功迁移 {len(value)} 个账户")
migrated_any = True
else:
# 新表为空,直接迁移
for index, acc in enumerate(value, 1):
account_id = acc.get("id") or f"account_{index}"
await conn.execute(
"""
INSERT INTO accounts (account_id, position, data, updated_at)
VALUES ($1, $2, $3, CURRENT_TIMESTAMP)
""",
account_id,
index,
json.dumps(acc, ensure_ascii=False)
)
print(f"✅ 成功迁移 {len(value)} 个账户")
migrated_any = True
else:
print("⚠️ kv_store 中 accounts 为空")
else:
print("⚠️ kv_store 中未找到 accounts")
# 2. 迁移 settings
print("\n### 迁移 settings ###")
row = await conn.fetchrow("SELECT value FROM kv_store WHERE key = $1", "settings")
if row:
value = row["value"]
if isinstance(value, str):
value = json.loads(value)
if isinstance(value, dict):
# 检查新表是否已有数据
exists = await conn.fetchval("SELECT 1 FROM kv_settings WHERE key = $1", "settings")
if exists:
print("⚠️ kv_settings 表已有 settings 记录")
confirm = input("是否覆盖?(yes/no): ").strip().lower()
if confirm not in ("yes", "y"):
print("跳过 settings 迁移")
else:
await conn.execute(
"""
INSERT INTO kv_settings (key, value, updated_at)
VALUES ($1, $2, CURRENT_TIMESTAMP)
ON CONFLICT(key) DO UPDATE SET
value = EXCLUDED.value,
updated_at = CURRENT_TIMESTAMP
""",
"settings",
json.dumps(value, ensure_ascii=False)
)
print("✅ 成功迁移 settings")
migrated_any = True
else:
await conn.execute(
"""
INSERT INTO kv_settings (key, value, updated_at)
VALUES ($1, $2, CURRENT_TIMESTAMP)
""",
"settings",
json.dumps(value, ensure_ascii=False)
)
print("✅ 成功迁移 settings")
migrated_any = True
else:
print("⚠️ kv_store 中 settings 格式错误")
else:
print("⚠️ kv_store 中未找到 settings")
# 3. 迁移 stats
print("\n### 迁移 stats ###")
row = await conn.fetchrow("SELECT value FROM kv_store WHERE key = $1", "stats")
if row:
value = row["value"]
if isinstance(value, str):
value = json.loads(value)
if isinstance(value, dict):
# 检查新表是否已有数据
exists = await conn.fetchval("SELECT 1 FROM kv_stats WHERE key = $1", "stats")
if exists:
print("⚠️ kv_stats 表已有 stats 记录")
confirm = input("是否覆盖?(yes/no): ").strip().lower()
if confirm not in ("yes", "y"):
print("跳过 stats 迁移")
else:
await conn.execute(
"""
INSERT INTO kv_stats (key, value, updated_at)
VALUES ($1, $2, CURRENT_TIMESTAMP)
ON CONFLICT(key) DO UPDATE SET
value = EXCLUDED.value,
updated_at = CURRENT_TIMESTAMP
""",
"stats",
json.dumps(value, ensure_ascii=False)
)
print("✅ 成功迁移 stats")
migrated_any = True
else:
await conn.execute(
"""
INSERT INTO kv_stats (key, value, updated_at)
VALUES ($1, $2, CURRENT_TIMESTAMP)
""",
"stats",
json.dumps(value, ensure_ascii=False)
)
print("✅ 成功迁移 stats")
migrated_any = True
else:
print("⚠️ kv_store 中 stats 格式错误")
else:
print("⚠️ kv_store 中未找到 stats")
return migrated_any
async def migrate_from_local_files(conn):
"""从本地文件迁移数据到数据库"""
print("\n" + "=" * 60)
print("从本地文件迁移数据")
print("=" * 60)
data_dir = project_root / "data"
accounts_file = data_dir / "accounts.json"
settings_file = data_dir / "settings.yaml"
stats_file = data_dir / "stats.json"
migrated_any = False
# 1. 迁移 accounts.json
print("\n### 迁移 accounts.json ###")
if accounts_file.exists():
try:
with open(accounts_file, "r", encoding="utf-8") as f:
accounts_data = json.load(f)
if isinstance(accounts_data, list) and len(accounts_data) > 0:
# 检查新表是否已有数据
count = await conn.fetchval("SELECT COUNT(*) FROM accounts")
if count > 0:
print(f"⚠️ accounts 表已有 {count} 条记录")
confirm = input("是否覆盖?(yes/no): ").strip().lower()
if confirm not in ("yes", "y"):
print("跳过 accounts.json 迁移")
else:
# 清空并迁移
await conn.execute("DELETE FROM accounts")
for index, acc in enumerate(accounts_data, 1):
account_id = acc.get("id") or f"account_{index}"
await conn.execute(
"""
INSERT INTO accounts (account_id, position, data, updated_at)
VALUES ($1, $2, $3, CURRENT_TIMESTAMP)
""",
account_id,
index,
json.dumps(acc, ensure_ascii=False)
)
print(f"✅ 成功迁移 {len(accounts_data)} 个账户")
# 重命名文件
new_path = rename_migrated_file(str(accounts_file))
print(f"✅ 文件已重命名: {new_path}")
migrated_any = True
else:
# 新表为空,直接迁移
for index, acc in enumerate(accounts_data, 1):
account_id = acc.get("id") or f"account_{index}"
await conn.execute(
"""
INSERT INTO accounts (account_id, position, data, updated_at)
VALUES ($1, $2, $3, CURRENT_TIMESTAMP)
""",
account_id,
index,
json.dumps(acc, ensure_ascii=False)
)
print(f"✅ 成功迁移 {len(accounts_data)} 个账户")
# 重命名文件
new_path = rename_migrated_file(str(accounts_file))
print(f"✅ 文件已重命名: {new_path}")
migrated_any = True
else:
print("⚠️ accounts.json 为空")
except Exception as e:
print(f"❌ 迁移 accounts.json 失败: {e}")
else:
print("⚠️ accounts.json 不存在")
# 2. 迁移 settings.yaml
print("\n### 迁移 settings.yaml ###")
if settings_file.exists():
try:
import yaml
with open(settings_file, "r", encoding="utf-8") as f:
settings_data = yaml.safe_load(f) or {}
if isinstance(settings_data, dict):
# 检查新表是否已有数据
exists = await conn.fetchval("SELECT 1 FROM kv_settings WHERE key = $1", "settings")
if exists:
print("⚠️ kv_settings 表已有 settings 记录")
confirm = input("是否覆盖?(yes/no): ").strip().lower()
if confirm not in ("yes", "y"):
print("跳过 settings.yaml 迁移")
else:
await conn.execute(
"""
INSERT INTO kv_settings (key, value, updated_at)
VALUES ($1, $2, CURRENT_TIMESTAMP)
ON CONFLICT(key) DO UPDATE SET
value = EXCLUDED.value,
updated_at = CURRENT_TIMESTAMP
""",
"settings",
json.dumps(settings_data, ensure_ascii=False)
)
print("✅ 成功迁移 settings")
# 重命名文件
new_path = rename_migrated_file(str(settings_file))
print(f"✅ 文件已重命名: {new_path}")
migrated_any = True
else:
await conn.execute(
"""
INSERT INTO kv_settings (key, value, updated_at)
VALUES ($1, $2, CURRENT_TIMESTAMP)
""",
"settings",
json.dumps(settings_data, ensure_ascii=False)
)
print("✅ 成功迁移 settings")
# 重命名文件
new_path = rename_migrated_file(str(settings_file))
print(f"✅ 文件已重命名: {new_path}")
migrated_any = True
else:
print("⚠️ settings.yaml 格式错误")
except Exception as e:
print(f"❌ 迁移 settings.yaml 失败: {e}")
else:
print("⚠️ settings.yaml 不存在")
# 3. 迁移 stats.json
print("\n### 迁移 stats.json ###")
if stats_file.exists():
try:
with open(stats_file, "r", encoding="utf-8") as f:
stats_data = json.load(f)
if isinstance(stats_data, dict):
# 检查新表是否已有数据
exists = await conn.fetchval("SELECT 1 FROM kv_stats WHERE key = $1", "stats")
if exists:
print("⚠️ kv_stats 表已有 stats 记录")
confirm = input("是否覆盖?(yes/no): ").strip().lower()
if confirm not in ("yes", "y"):
print("跳过 stats.json 迁移")
else:
await conn.execute(
"""
INSERT INTO kv_stats (key, value, updated_at)
VALUES ($1, $2, CURRENT_TIMESTAMP)
ON CONFLICT(key) DO UPDATE SET
value = EXCLUDED.value,
updated_at = CURRENT_TIMESTAMP
""",
"stats",
json.dumps(stats_data, ensure_ascii=False)
)
print("✅ 成功迁移 stats")
# 重命名文件
new_path = rename_migrated_file(str(stats_file))
print(f"✅ 文件已重命名: {new_path}")
migrated_any = True
else:
await conn.execute(
"""
INSERT INTO kv_stats (key, value, updated_at)
VALUES ($1, $2, CURRENT_TIMESTAMP)
""",
"stats",
json.dumps(stats_data, ensure_ascii=False)
)
print("✅ 成功迁移 stats")
# 重命名文件
new_path = rename_migrated_file(str(stats_file))
print(f"✅ 文件已重命名: {new_path}")
migrated_any = True
else:
print("⚠️ stats.json 格式错误")
except Exception as e:
print(f"❌ 迁移 stats.json 失败: {e}")
else:
print("⚠️ stats.json 不存在")
return migrated_any
def _init_sqlite_tables(conn: sqlite3.Connection) -> None:
"""初始化 SQLite 表结构"""
with conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS accounts (
account_id TEXT PRIMARY KEY,
position INTEGER NOT NULL,
data TEXT NOT NULL,
updated_at TEXT DEFAULT CURRENT_TIMESTAMP
)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS kv_settings (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
updated_at TEXT DEFAULT CURRENT_TIMESTAMP
)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS kv_stats (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
updated_at TEXT DEFAULT CURRENT_TIMESTAMP
)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS task_history (
id TEXT PRIMARY KEY,
data TEXT NOT NULL,
created_at TEXT NOT NULL
)
"""
)
conn.execute(
"""
CREATE INDEX IF NOT EXISTS task_history_created_at_idx
ON task_history(created_at DESC)
"""
)
def migrate_from_local_files_sqlite(conn: sqlite3.Connection) -> bool:
"""从本地文件迁移数据到 SQLite"""
print("\n" + "=" * 60)
print("从本地文件迁移数据")
print("=" * 60)
data_dir = project_root / "data"
accounts_file = data_dir / "accounts.json"
settings_file = data_dir / "settings.yaml"
stats_file = data_dir / "stats.json"
migrated_any = False
# 1. 迁移 accounts
print("\n### 迁移 accounts ###")
if accounts_file.exists():
try:
with open(accounts_file, "r", encoding="utf-8") as f:
accounts_data = json.load(f)
if isinstance(accounts_data, list) and len(accounts_data) > 0:
# 检查新表是否已有数据
count = conn.execute("SELECT COUNT(*) FROM accounts").fetchone()[0]
if count > 0:
print(f"⚠️ accounts 表已有 {count} 条记录")
confirm = input("是否覆盖?(yes/no): ").strip().lower()
if confirm not in ("yes", "y"):
print("跳过 accounts 迁移")
else:
# 清空并迁移
with conn:
conn.execute("DELETE FROM accounts")
for index, acc in enumerate(accounts_data, 1):
account_id = acc.get("id") or f"account_{index}"
conn.execute(
"""
INSERT INTO accounts (account_id, position, data, updated_at)
VALUES (?, ?, ?, CURRENT_TIMESTAMP)
""",
(account_id, index, json.dumps(acc, ensure_ascii=False))
)
print(f"✅ 成功迁移 {len(accounts_data)} 个账户")
migrated_any = True
# 重命名文件
new_path = rename_migrated_file(str(accounts_file))
if new_path:
print(f"✅ 已重命名: {accounts_file.name}{Path(new_path).name}")
else:
# 直接迁移
with conn:
for index, acc in enumerate(accounts_data, 1):
account_id = acc.get("id") or f"account_{index}"
conn.execute(
"""
INSERT INTO accounts (account_id, position, data, updated_at)
VALUES (?, ?, ?, CURRENT_TIMESTAMP)
""",
(account_id, index, json.dumps(acc, ensure_ascii=False))
)
print(f"✅ 成功迁移 {len(accounts_data)} 个账户")
migrated_any = True
# 重命名文件
new_path = rename_migrated_file(str(accounts_file))
if new_path:
print(f"✅ 已重命名: {accounts_file.name}{Path(new_path).name}")
else:
print("⚠️ accounts.json 为空或格式错误")
except Exception as e:
print(f"❌ 迁移 accounts 失败: {e}")
else:
print("⚠️ 未找到 accounts.json")
# 2. 迁移 settings
print("\n### 迁移 settings ###")
if settings_file.exists():
try:
import yaml
with open(settings_file, "r", encoding="utf-8") as f:
settings_data = yaml.safe_load(f)
if settings_data:
# 检查新表是否已有数据
row = conn.execute(
"SELECT 1 FROM kv_settings WHERE key = ?",
("settings",)
).fetchone()
if row:
print("⚠️ kv_settings 表已有 settings 记录")
confirm = input("是否覆盖?(yes/no): ").strip().lower()
if confirm not in ("yes", "y"):
print("跳过 settings 迁移")
else:
with conn:
conn.execute(
"""
INSERT INTO kv_settings (key, value, updated_at)
VALUES (?, ?, CURRENT_TIMESTAMP)
ON CONFLICT(key) DO UPDATE SET
value = excluded.value,
updated_at = CURRENT_TIMESTAMP
""",
("settings", json.dumps(settings_data, ensure_ascii=False))
)
print("✅ 成功迁移 settings")
migrated_any = True
# 重命名文件
new_path = rename_migrated_file(str(settings_file))
if new_path:
print(f"✅ 已重命名: {settings_file.name}{Path(new_path).name}")
else:
with conn:
conn.execute(
"""
INSERT INTO kv_settings (key, value, updated_at)
VALUES (?, ?, CURRENT_TIMESTAMP)
""",
("settings", json.dumps(settings_data, ensure_ascii=False))
)
print("✅ 成功迁移 settings")
migrated_any = True
# 重命名文件
new_path = rename_migrated_file(str(settings_file))
if new_path:
print(f"✅ 已重命名: {settings_file.name}{Path(new_path).name}")
else:
print("⚠️ settings.yaml 为空")
except Exception as e:
print(f"❌ 迁移 settings 失败: {e}")
else:
print("⚠️ 未找到 settings.yaml")
# 3. 迁移 stats
print("\n### 迁移 stats ###")
if stats_file.exists():
try:
with open(stats_file, "r", encoding="utf-8") as f:
stats_data = json.load(f)
if stats_data:
# 检查新表是否已有数据
row = conn.execute(
"SELECT 1 FROM kv_stats WHERE key = ?",
("stats",)
).fetchone()
if row:
print("⚠️ kv_stats 表已有 stats 记录")
confirm = input("是否覆盖?(yes/no): ").strip().lower()
if confirm not in ("yes", "y"):
print("跳过 stats 迁移")
else:
with conn:
conn.execute(
"""
INSERT INTO kv_stats (key, value, updated_at)
VALUES (?, ?, CURRENT_TIMESTAMP)
ON CONFLICT(key) DO UPDATE SET
value = excluded.value,
updated_at = CURRENT_TIMESTAMP
""",
("stats", json.dumps(stats_data, ensure_ascii=False))
)
print("✅ 成功迁移 stats")
migrated_any = True
# 重命名文件
new_path = rename_migrated_file(str(stats_file))
if new_path:
print(f"✅ 已重命名: {stats_file.name}{Path(new_path).name}")
else:
with conn:
conn.execute(
"""
INSERT INTO kv_stats (key, value, updated_at)
VALUES (?, ?, CURRENT_TIMESTAMP)
""",
("stats", json.dumps(stats_data, ensure_ascii=False))
)
print("✅ 成功迁移 stats")
migrated_any = True
# 重命名文件
new_path = rename_migrated_file(str(stats_file))
if new_path:
print(f"✅ 已重命名: {stats_file.name}{Path(new_path).name}")
else:
print("⚠️ stats.json 为空")
except Exception as e:
print(f"❌ 迁移 stats 失败: {e}")
else:
print("⚠️ 未找到 stats.json")
return migrated_any
async def main():
"""主函数"""
database_url = os.environ.get("DATABASE_URL", "").strip()
# 判断使用哪种数据库
if database_url:
backend = "postgres"
db_info = database_url.split('@')[1] if '@' in database_url else 'PostgreSQL'
else:
backend = "sqlite"
db_info = "data/data.db (SQLite)"
print("=" * 60)
print("数据库迁移脚本")
print("=" * 60)
print(f"目标数据库: {db_info}")
print()
print("迁移内容:")
if backend == "postgres":
print(" 1. kv_store → 新表(accounts, kv_settings, kv_stats)")
print(" 2. 本地文件 → 新表")
else:
print(" 1. 本地文件 → SQLite 数据库")
print()
print("迁移后:")
if backend == "postgres":
print(" - kv_store 保留(作为备份)")
print(" - 本地文件重命名为 .migrated_YYYYMMDD-HHMMSS")
print()
confirm = input("开始迁移?(yes/no): ").strip().lower()
if confirm not in ("yes", "y"):
print("❌ 操作已取消")
return False
try:
if backend == "postgres":
# PostgreSQL 迁移
import asyncpg
conn = await asyncpg.connect(database_url)
# 1. 从 kv_store 迁移
kv_migrated = await migrate_from_kv_store(conn)
# 2. 从本地文件迁移
file_migrated = await migrate_from_local_files(conn)
await conn.close()
else:
# SQLite 迁移
sqlite_path = project_root / "data" / "data.db"
os.makedirs(sqlite_path.parent, exist_ok=True)
conn = sqlite3.connect(str(sqlite_path))
conn.row_factory = sqlite3.Row
# 初始化表结构
_init_sqlite_tables(conn)
# 从本地文件迁移
file_migrated = migrate_from_local_files_sqlite(conn)
kv_migrated = False
conn.close()
print("\n" + "=" * 60)
if kv_migrated or file_migrated:
print("✅ 迁移完成!")
else:
print("⚠️ 没有数据需要迁移")
print("=" * 60)
print()
print("下一步:")
print("1. 重启应用")
print("2. 应用会自动使用新表数据")
print()
return True
except ImportError as e:
if "asyncpg" in str(e):
print("❌ 错误:未安装 asyncpg")
print(" 请运行: pip install asyncpg")
else:
print(f"❌ 错误:{e}")
return False
except Exception as e:
print(f"❌ 错误:{e}")
import traceback
traceback.print_exc()
return False
if __name__ == "__main__":
success = asyncio.run(main())
sys.exit(0 if success else 1)