| |
| from __future__ import annotations |
|
|
| import argparse |
| import os |
| import sys |
| from pathlib import Path |
| from typing import Any |
|
|
| ROOT_DIR = Path(__file__).resolve().parents[1] |
| sys.path.insert(0, str(ROOT_DIR)) |
|
|
| from services.storage.database_storage import DatabaseStorageBackend |
| from services.storage.factory import create_storage_backend |
|
|
|
|
| def _env_path(name: str, fallback: Path) -> Path: |
| value = str(os.getenv(name) or "").strip() |
| return Path(value) if value else fallback |
|
|
|
|
| def _data_dir() -> Path: |
| persist_dir = Path(os.getenv("CHATGPT2API_PERSIST_DIR", "").strip() or "/app/persist") |
| return _env_path("CHATGPT2API_DATA_DIR", persist_dir if persist_dir.exists() else ROOT_DIR / "data") |
|
|
|
|
| def _confirm_target_is_safe(target: DatabaseStorageBackend, *, force: bool) -> tuple[list[dict[str, Any]], dict[str, Any]]: |
| existing_auth_keys = target.load_auth_keys() |
| existing_shop_state = target.load_shop_state() |
| if force: |
| return existing_auth_keys, existing_shop_state |
| if existing_auth_keys or existing_shop_state: |
| raise RuntimeError( |
| "Target shared user database already has auth_keys or shop_state. " |
| "Use --merge to append/update safely, or --force to replace the target data." |
| ) |
| return existing_auth_keys, existing_shop_state |
|
|
|
|
| def _merge_auth_keys(existing: list[dict[str, Any]], incoming: list[dict[str, Any]]) -> list[dict[str, Any]]: |
| merged: dict[str, dict[str, Any]] = {} |
| for item in existing: |
| item_id = str(item.get("id") or "").strip() |
| if item_id: |
| merged[item_id] = dict(item) |
| for item in incoming: |
| item_id = str(item.get("id") or "").strip() |
| if item_id: |
| merged[item_id] = dict(item) |
| return list(merged.values()) |
|
|
|
|
| def _merge_shop_state(existing: dict[str, Any], incoming: dict[str, Any]) -> dict[str, Any]: |
| existing_codes = existing.get("codes") if isinstance(existing.get("codes"), list) else [] |
| incoming_codes = incoming.get("codes") if isinstance(incoming.get("codes"), list) else [] |
| existing_ledger = existing.get("ledger") if isinstance(existing.get("ledger"), list) else [] |
| incoming_ledger = incoming.get("ledger") if isinstance(incoming.get("ledger"), list) else [] |
|
|
| codes_by_id: dict[str, dict[str, Any]] = {} |
| for item in [*existing_codes, *incoming_codes]: |
| if not isinstance(item, dict): |
| continue |
| item_id = str(item.get("id") or item.get("code_hash") or "").strip() |
| if item_id: |
| codes_by_id[item_id] = dict(item) |
|
|
| ledger_by_id: dict[str, dict[str, Any]] = {} |
| for item in [*existing_ledger, *incoming_ledger]: |
| if not isinstance(item, dict): |
| continue |
| item_id = str(item.get("id") or "").strip() |
| if item_id: |
| ledger_by_id[item_id] = dict(item) |
|
|
| return { |
| "codes": list(codes_by_id.values()), |
| "ledger": sorted(ledger_by_id.values(), key=lambda item: str(item.get("time") or "")), |
| } |
|
|
|
|
| def main() -> int: |
| parser = argparse.ArgumentParser( |
| description="Migrate users, credits, WeChat bindings, redeem codes, and redemption history to SHARED_USER_DATABASE_URL.", |
| ) |
| parser.add_argument("--merge", action="store_true", help="Merge source data into target instead of requiring an empty target.") |
| parser.add_argument("--force", action="store_true", help="Replace target auth_keys and shop_state with source data.") |
| parser.add_argument("--dry-run", action="store_true", help="Show what would be migrated without writing target data.") |
| args = parser.parse_args() |
|
|
| target_url = str(os.getenv("SHARED_USER_DATABASE_URL") or "").strip() |
| if not target_url: |
| raise RuntimeError("SHARED_USER_DATABASE_URL is required.") |
|
|
| source = create_storage_backend(_data_dir()) |
| target = DatabaseStorageBackend(target_url) |
|
|
| source_auth_keys = source.load_auth_keys() |
| source_shop_state = source.load_shop_state() |
| existing_auth_keys, existing_shop_state = _confirm_target_is_safe(target, force=args.force or args.merge) |
|
|
| if args.merge and not args.force: |
| target_auth_keys = _merge_auth_keys(existing_auth_keys, source_auth_keys) |
| target_shop_state = _merge_shop_state(existing_shop_state, source_shop_state) |
| else: |
| target_auth_keys = source_auth_keys |
| target_shop_state = source_shop_state |
|
|
| code_count = len(target_shop_state.get("codes") or []) if isinstance(target_shop_state, dict) else 0 |
| ledger_count = len(target_shop_state.get("ledger") or []) if isinstance(target_shop_state, dict) else 0 |
| print( |
| "[migrate-user-storage] prepared: " |
| f"auth_keys={len(target_auth_keys)}, codes={code_count}, ledger={ledger_count}, " |
| f"mode={'dry-run' if args.dry_run else 'merge' if args.merge and not args.force else 'replace'}" |
| ) |
|
|
| if args.dry_run: |
| return 0 |
|
|
| target.save_auth_keys(target_auth_keys) |
| target.save_shop_state(target_shop_state if isinstance(target_shop_state, dict) else {}) |
| print("[migrate-user-storage] migration completed.") |
| return 0 |
|
|
|
|
| if __name__ == "__main__": |
| raise SystemExit(main()) |
|
|