li / scripts /migrate_user_storage.py
xiaolongmr
Share user storage across spaces
8bb6984
Raw
History Blame Contribute Delete
5.17 kB
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import os
import sys
from pathlib import Path
from typing import Any
ROOT_DIR = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT_DIR))
from services.storage.database_storage import DatabaseStorageBackend
from services.storage.factory import create_storage_backend
def _env_path(name: str, fallback: Path) -> Path:
value = str(os.getenv(name) or "").strip()
return Path(value) if value else fallback
def _data_dir() -> Path:
persist_dir = Path(os.getenv("CHATGPT2API_PERSIST_DIR", "").strip() or "/app/persist")
return _env_path("CHATGPT2API_DATA_DIR", persist_dir if persist_dir.exists() else ROOT_DIR / "data")
def _confirm_target_is_safe(target: DatabaseStorageBackend, *, force: bool) -> tuple[list[dict[str, Any]], dict[str, Any]]:
existing_auth_keys = target.load_auth_keys()
existing_shop_state = target.load_shop_state()
if force:
return existing_auth_keys, existing_shop_state
if existing_auth_keys or existing_shop_state:
raise RuntimeError(
"Target shared user database already has auth_keys or shop_state. "
"Use --merge to append/update safely, or --force to replace the target data."
)
return existing_auth_keys, existing_shop_state
def _merge_auth_keys(existing: list[dict[str, Any]], incoming: list[dict[str, Any]]) -> list[dict[str, Any]]:
merged: dict[str, dict[str, Any]] = {}
for item in existing:
item_id = str(item.get("id") or "").strip()
if item_id:
merged[item_id] = dict(item)
for item in incoming:
item_id = str(item.get("id") or "").strip()
if item_id:
merged[item_id] = dict(item)
return list(merged.values())
def _merge_shop_state(existing: dict[str, Any], incoming: dict[str, Any]) -> dict[str, Any]:
existing_codes = existing.get("codes") if isinstance(existing.get("codes"), list) else []
incoming_codes = incoming.get("codes") if isinstance(incoming.get("codes"), list) else []
existing_ledger = existing.get("ledger") if isinstance(existing.get("ledger"), list) else []
incoming_ledger = incoming.get("ledger") if isinstance(incoming.get("ledger"), list) else []
codes_by_id: dict[str, dict[str, Any]] = {}
for item in [*existing_codes, *incoming_codes]:
if not isinstance(item, dict):
continue
item_id = str(item.get("id") or item.get("code_hash") or "").strip()
if item_id:
codes_by_id[item_id] = dict(item)
ledger_by_id: dict[str, dict[str, Any]] = {}
for item in [*existing_ledger, *incoming_ledger]:
if not isinstance(item, dict):
continue
item_id = str(item.get("id") or "").strip()
if item_id:
ledger_by_id[item_id] = dict(item)
return {
"codes": list(codes_by_id.values()),
"ledger": sorted(ledger_by_id.values(), key=lambda item: str(item.get("time") or "")),
}
def main() -> int:
parser = argparse.ArgumentParser(
description="Migrate users, credits, WeChat bindings, redeem codes, and redemption history to SHARED_USER_DATABASE_URL.",
)
parser.add_argument("--merge", action="store_true", help="Merge source data into target instead of requiring an empty target.")
parser.add_argument("--force", action="store_true", help="Replace target auth_keys and shop_state with source data.")
parser.add_argument("--dry-run", action="store_true", help="Show what would be migrated without writing target data.")
args = parser.parse_args()
target_url = str(os.getenv("SHARED_USER_DATABASE_URL") or "").strip()
if not target_url:
raise RuntimeError("SHARED_USER_DATABASE_URL is required.")
source = create_storage_backend(_data_dir())
target = DatabaseStorageBackend(target_url)
source_auth_keys = source.load_auth_keys()
source_shop_state = source.load_shop_state()
existing_auth_keys, existing_shop_state = _confirm_target_is_safe(target, force=args.force or args.merge)
if args.merge and not args.force:
target_auth_keys = _merge_auth_keys(existing_auth_keys, source_auth_keys)
target_shop_state = _merge_shop_state(existing_shop_state, source_shop_state)
else:
target_auth_keys = source_auth_keys
target_shop_state = source_shop_state
code_count = len(target_shop_state.get("codes") or []) if isinstance(target_shop_state, dict) else 0
ledger_count = len(target_shop_state.get("ledger") or []) if isinstance(target_shop_state, dict) else 0
print(
"[migrate-user-storage] prepared: "
f"auth_keys={len(target_auth_keys)}, codes={code_count}, ledger={ledger_count}, "
f"mode={'dry-run' if args.dry_run else 'merge' if args.merge and not args.force else 'replace'}"
)
if args.dry_run:
return 0
target.save_auth_keys(target_auth_keys)
target.save_shop_state(target_shop_state if isinstance(target_shop_state, dict) else {})
print("[migrate-user-storage] migration completed.")
return 0
if __name__ == "__main__":
raise SystemExit(main())