#!/usr/bin/env python3 from __future__ import annotations import argparse import os import sys from pathlib import Path from typing import Any ROOT_DIR = Path(__file__).resolve().parents[1] sys.path.insert(0, str(ROOT_DIR)) from services.storage.database_storage import DatabaseStorageBackend from services.storage.factory import create_storage_backend def _env_path(name: str, fallback: Path) -> Path: value = str(os.getenv(name) or "").strip() return Path(value) if value else fallback def _data_dir() -> Path: persist_dir = Path(os.getenv("CHATGPT2API_PERSIST_DIR", "").strip() or "/app/persist") return _env_path("CHATGPT2API_DATA_DIR", persist_dir if persist_dir.exists() else ROOT_DIR / "data") def _confirm_target_is_safe(target: DatabaseStorageBackend, *, force: bool) -> tuple[list[dict[str, Any]], dict[str, Any]]: existing_auth_keys = target.load_auth_keys() existing_shop_state = target.load_shop_state() if force: return existing_auth_keys, existing_shop_state if existing_auth_keys or existing_shop_state: raise RuntimeError( "Target shared user database already has auth_keys or shop_state. " "Use --merge to append/update safely, or --force to replace the target data." ) return existing_auth_keys, existing_shop_state def _merge_auth_keys(existing: list[dict[str, Any]], incoming: list[dict[str, Any]]) -> list[dict[str, Any]]: merged: dict[str, dict[str, Any]] = {} for item in existing: item_id = str(item.get("id") or "").strip() if item_id: merged[item_id] = dict(item) for item in incoming: item_id = str(item.get("id") or "").strip() if item_id: merged[item_id] = dict(item) return list(merged.values()) def _merge_shop_state(existing: dict[str, Any], incoming: dict[str, Any]) -> dict[str, Any]: existing_codes = existing.get("codes") if isinstance(existing.get("codes"), list) else [] incoming_codes = incoming.get("codes") if isinstance(incoming.get("codes"), list) else [] existing_ledger = existing.get("ledger") if isinstance(existing.get("ledger"), list) else [] incoming_ledger = incoming.get("ledger") if isinstance(incoming.get("ledger"), list) else [] codes_by_id: dict[str, dict[str, Any]] = {} for item in [*existing_codes, *incoming_codes]: if not isinstance(item, dict): continue item_id = str(item.get("id") or item.get("code_hash") or "").strip() if item_id: codes_by_id[item_id] = dict(item) ledger_by_id: dict[str, dict[str, Any]] = {} for item in [*existing_ledger, *incoming_ledger]: if not isinstance(item, dict): continue item_id = str(item.get("id") or "").strip() if item_id: ledger_by_id[item_id] = dict(item) return { "codes": list(codes_by_id.values()), "ledger": sorted(ledger_by_id.values(), key=lambda item: str(item.get("time") or "")), } def main() -> int: parser = argparse.ArgumentParser( description="Migrate users, credits, WeChat bindings, redeem codes, and redemption history to SHARED_USER_DATABASE_URL.", ) parser.add_argument("--merge", action="store_true", help="Merge source data into target instead of requiring an empty target.") parser.add_argument("--force", action="store_true", help="Replace target auth_keys and shop_state with source data.") parser.add_argument("--dry-run", action="store_true", help="Show what would be migrated without writing target data.") args = parser.parse_args() target_url = str(os.getenv("SHARED_USER_DATABASE_URL") or "").strip() if not target_url: raise RuntimeError("SHARED_USER_DATABASE_URL is required.") source = create_storage_backend(_data_dir()) target = DatabaseStorageBackend(target_url) source_auth_keys = source.load_auth_keys() source_shop_state = source.load_shop_state() existing_auth_keys, existing_shop_state = _confirm_target_is_safe(target, force=args.force or args.merge) if args.merge and not args.force: target_auth_keys = _merge_auth_keys(existing_auth_keys, source_auth_keys) target_shop_state = _merge_shop_state(existing_shop_state, source_shop_state) else: target_auth_keys = source_auth_keys target_shop_state = source_shop_state code_count = len(target_shop_state.get("codes") or []) if isinstance(target_shop_state, dict) else 0 ledger_count = len(target_shop_state.get("ledger") or []) if isinstance(target_shop_state, dict) else 0 print( "[migrate-user-storage] prepared: " f"auth_keys={len(target_auth_keys)}, codes={code_count}, ledger={ledger_count}, " f"mode={'dry-run' if args.dry_run else 'merge' if args.merge and not args.force else 'replace'}" ) if args.dry_run: return 0 target.save_auth_keys(target_auth_keys) target.save_shop_state(target_shop_state if isinstance(target_shop_state, dict) else {}) print("[migrate-user-storage] migration completed.") return 0 if __name__ == "__main__": raise SystemExit(main())