| import json |
| import logging |
| import os |
|
|
| logger = logging.getLogger(__name__) |
|
|
| CONFIG_PATH = os.getenv("CONFIG_PATH", "config.json") |
| SENSITIVE_ACCOUNT_FIELDS = {"password", "token", "hif_dliq", "hif_leim"} |
| KEYS_PLACEHOLDER = "$DEEPSEEK_API_KEY" |
| PASSWORD_PLACEHOLDER = "$DEEPSEEK_PASSWORD" |
| ACCOUNT_ENV_NAMES = ["DEEPSEEK_ACCOUNT", "DEEPSEEK_ACCOUNT2", "DEEPSEEK_ACCOUNT3", "DEEPSEEK_ACCOUNT4"] |
|
|
|
|
| def _account_placeholders() -> set[str]: |
| return {f"${name}" for name in ACCOUNT_ENV_NAMES} |
|
|
|
|
| def _account_from_value(account: str, password: str) -> dict: |
| env_account = {"password": password, "token": ""} |
| if "@" in account: |
| env_account["email"] = account |
| else: |
| env_account["mobile"] = account |
| return env_account |
|
|
|
|
| def _accounts_from_env() -> list[dict]: |
| password = os.getenv("DEEPSEEK_PASSWORD", "").strip() |
| if not password: |
| return [] |
| accounts = [] |
| seen = set() |
| for env_name in ACCOUNT_ENV_NAMES: |
| account = os.getenv(env_name, "").strip() |
| if not account or account in seen: |
| continue |
| seen.add(account) |
| accounts.append(_account_from_value(account, password)) |
| return accounts |
|
|
|
|
| def _merge_env_secrets(config: dict) -> dict: |
| merged = dict(config) if isinstance(config, dict) else {} |
|
|
| api_key = os.getenv("DEEPSEEK_API_KEY", "").strip() |
| if api_key: |
| merged["keys"] = [api_key] |
|
|
| env_accounts = _accounts_from_env() |
| if env_accounts: |
| accounts = merged.get("accounts") |
| if not isinstance(accounts, list): |
| accounts = [] |
| placeholders = _account_placeholders() |
| desired_accounts = { |
| (account.get("email") or account.get("mobile")): account |
| for account in env_accounts |
| } |
| merged_accounts = [] |
| for account in accounts: |
| if not isinstance(account, dict): |
| continue |
| current_identity = account.get("email") or account.get("mobile") |
| if current_identity in desired_accounts or current_identity in placeholders: |
| desired_account = desired_accounts.get(current_identity) |
| if desired_account is None and len(desired_accounts) == 1: |
| desired_account = next(iter(desired_accounts.values())) |
| if desired_account is None: |
| continue |
| desired_identity = desired_account.get("email") or desired_account.get("mobile") |
| identity_key = "email" if "email" in desired_account else "mobile" |
| updated = { |
| k: v |
| for k, v in account.items() |
| if k not in {"email", "mobile", "password", "token", "hif_dliq", "hif_leim"} |
| } |
| updated[identity_key] = desired_identity |
| updated["password"] = desired_account["password"] |
| merged_accounts.append(updated) |
| desired_accounts.pop(desired_identity, None) |
| else: |
| merged_accounts.append(account) |
| for remaining in desired_accounts.values(): |
| merged_accounts.append(remaining) |
| merged["accounts"] = merged_accounts |
|
|
| return merged |
|
|
|
|
| def _sanitize_for_persistence(obj): |
| if isinstance(obj, dict): |
| sanitized = {} |
| for key, value in obj.items(): |
| if key.startswith("_"): |
| continue |
| if key == "keys": |
| if os.getenv("DEEPSEEK_API_KEY", "").strip(): |
| sanitized[key] = [KEYS_PLACEHOLDER] |
| continue |
| if key == "password": |
| if os.getenv("DEEPSEEK_PASSWORD", "").strip(): |
| sanitized[key] = PASSWORD_PLACEHOLDER |
| continue |
| if key in SENSITIVE_ACCOUNT_FIELDS: |
| continue |
| if key in {"mobile", "email"} and isinstance(value, str): |
| for env_name in ACCOUNT_ENV_NAMES: |
| env_value = os.getenv(env_name, "").strip() |
| if env_value and value == env_value: |
| sanitized[key] = f"${env_name}" |
| break |
| else: |
| sanitized[key] = _sanitize_for_persistence(value) |
| continue |
| sanitized[key] = _sanitize_for_persistence(value) |
| return sanitized |
| if isinstance(obj, list): |
| return [_sanitize_for_persistence(item) for item in obj] |
| return obj |
|
|
|
|
| def load_config(): |
| """从 config.json 加载配置,出错则返回空 dict""" |
| try: |
| with open(CONFIG_PATH, "r", encoding="utf-8") as f: |
| return _merge_env_secrets(json.load(f)) |
| except Exception as e: |
| logger.warning(f"[load_config] 无法读取配置文件: {e}") |
| return _merge_env_secrets({}) |
|
|
|
|
| def save_config(cfg): |
| """将配置写回 config.json,但不持久化密钥、密码或会话令牌。""" |
| try: |
| cfg_clean = _sanitize_for_persistence(cfg) |
| with open(CONFIG_PATH, "w", encoding="utf-8") as f: |
| json.dump(cfg_clean, f, ensure_ascii=False, indent=2) |
| except Exception as e: |
| logger.error(f"[save_config] 写入 config.json 失败: {e}") |
|
|
|
|
| CONFIG = load_config() |
|
|