import json import logging import os logger = logging.getLogger(__name__) CONFIG_PATH = os.getenv("CONFIG_PATH", "config.json") SENSITIVE_ACCOUNT_FIELDS = {"password", "token", "hif_dliq", "hif_leim"} KEYS_PLACEHOLDER = "$DEEPSEEK_API_KEY" PASSWORD_PLACEHOLDER = "$DEEPSEEK_PASSWORD" ACCOUNT_ENV_NAMES = ["DEEPSEEK_ACCOUNT", "DEEPSEEK_ACCOUNT2", "DEEPSEEK_ACCOUNT3", "DEEPSEEK_ACCOUNT4"] def _account_placeholders() -> set[str]: return {f"${name}" for name in ACCOUNT_ENV_NAMES} def _account_from_value(account: str, password: str) -> dict: env_account = {"password": password, "token": ""} if "@" in account: env_account["email"] = account else: env_account["mobile"] = account return env_account def _accounts_from_env() -> list[dict]: password = os.getenv("DEEPSEEK_PASSWORD", "").strip() if not password: return [] accounts = [] seen = set() for env_name in ACCOUNT_ENV_NAMES: account = os.getenv(env_name, "").strip() if not account or account in seen: continue seen.add(account) accounts.append(_account_from_value(account, password)) return accounts def _merge_env_secrets(config: dict) -> dict: merged = dict(config) if isinstance(config, dict) else {} api_key = os.getenv("DEEPSEEK_API_KEY", "").strip() if api_key: merged["keys"] = [api_key] env_accounts = _accounts_from_env() if env_accounts: accounts = merged.get("accounts") if not isinstance(accounts, list): accounts = [] placeholders = _account_placeholders() desired_accounts = { (account.get("email") or account.get("mobile")): account for account in env_accounts } merged_accounts = [] for account in accounts: if not isinstance(account, dict): continue current_identity = account.get("email") or account.get("mobile") if current_identity in desired_accounts or current_identity in placeholders: desired_account = desired_accounts.get(current_identity) if desired_account is None and len(desired_accounts) == 1: desired_account = next(iter(desired_accounts.values())) if desired_account is None: continue desired_identity = desired_account.get("email") or desired_account.get("mobile") identity_key = "email" if "email" in desired_account else "mobile" updated = { k: v for k, v in account.items() if k not in {"email", "mobile", "password", "token", "hif_dliq", "hif_leim"} } updated[identity_key] = desired_identity updated["password"] = desired_account["password"] merged_accounts.append(updated) desired_accounts.pop(desired_identity, None) else: merged_accounts.append(account) for remaining in desired_accounts.values(): merged_accounts.append(remaining) merged["accounts"] = merged_accounts return merged def _sanitize_for_persistence(obj): if isinstance(obj, dict): sanitized = {} for key, value in obj.items(): if key.startswith("_"): continue if key == "keys": if os.getenv("DEEPSEEK_API_KEY", "").strip(): sanitized[key] = [KEYS_PLACEHOLDER] continue if key == "password": if os.getenv("DEEPSEEK_PASSWORD", "").strip(): sanitized[key] = PASSWORD_PLACEHOLDER continue if key in SENSITIVE_ACCOUNT_FIELDS: continue if key in {"mobile", "email"} and isinstance(value, str): for env_name in ACCOUNT_ENV_NAMES: env_value = os.getenv(env_name, "").strip() if env_value and value == env_value: sanitized[key] = f"${env_name}" break else: sanitized[key] = _sanitize_for_persistence(value) continue sanitized[key] = _sanitize_for_persistence(value) return sanitized if isinstance(obj, list): return [_sanitize_for_persistence(item) for item in obj] return obj def load_config(): """从 config.json 加载配置,出错则返回空 dict""" try: with open(CONFIG_PATH, "r", encoding="utf-8") as f: return _merge_env_secrets(json.load(f)) except Exception as e: logger.warning(f"[load_config] 无法读取配置文件: {e}") return _merge_env_secrets({}) def save_config(cfg): """将配置写回 config.json,但不持久化密钥、密码或会话令牌。""" try: cfg_clean = _sanitize_for_persistence(cfg) with open(CONFIG_PATH, "w", encoding="utf-8") as f: json.dump(cfg_clean, f, ensure_ascii=False, indent=2) except Exception as e: logger.error(f"[save_config] 写入 config.json 失败: {e}") CONFIG = load_config()