DS2api / app /config.py
cjovs's picture
Support DEEPSEEK_ACCOUNT4 secret
624fe7f verified
import json
import logging
import os
logger = logging.getLogger(__name__)
CONFIG_PATH = os.getenv("CONFIG_PATH", "config.json")
SENSITIVE_ACCOUNT_FIELDS = {"password", "token", "hif_dliq", "hif_leim"}
KEYS_PLACEHOLDER = "$DEEPSEEK_API_KEY"
PASSWORD_PLACEHOLDER = "$DEEPSEEK_PASSWORD"
ACCOUNT_ENV_NAMES = ["DEEPSEEK_ACCOUNT", "DEEPSEEK_ACCOUNT2", "DEEPSEEK_ACCOUNT3", "DEEPSEEK_ACCOUNT4"]
def _account_placeholders() -> set[str]:
return {f"${name}" for name in ACCOUNT_ENV_NAMES}
def _account_from_value(account: str, password: str) -> dict:
env_account = {"password": password, "token": ""}
if "@" in account:
env_account["email"] = account
else:
env_account["mobile"] = account
return env_account
def _accounts_from_env() -> list[dict]:
password = os.getenv("DEEPSEEK_PASSWORD", "").strip()
if not password:
return []
accounts = []
seen = set()
for env_name in ACCOUNT_ENV_NAMES:
account = os.getenv(env_name, "").strip()
if not account or account in seen:
continue
seen.add(account)
accounts.append(_account_from_value(account, password))
return accounts
def _merge_env_secrets(config: dict) -> dict:
merged = dict(config) if isinstance(config, dict) else {}
api_key = os.getenv("DEEPSEEK_API_KEY", "").strip()
if api_key:
merged["keys"] = [api_key]
env_accounts = _accounts_from_env()
if env_accounts:
accounts = merged.get("accounts")
if not isinstance(accounts, list):
accounts = []
placeholders = _account_placeholders()
desired_accounts = {
(account.get("email") or account.get("mobile")): account
for account in env_accounts
}
merged_accounts = []
for account in accounts:
if not isinstance(account, dict):
continue
current_identity = account.get("email") or account.get("mobile")
if current_identity in desired_accounts or current_identity in placeholders:
desired_account = desired_accounts.get(current_identity)
if desired_account is None and len(desired_accounts) == 1:
desired_account = next(iter(desired_accounts.values()))
if desired_account is None:
continue
desired_identity = desired_account.get("email") or desired_account.get("mobile")
identity_key = "email" if "email" in desired_account else "mobile"
updated = {
k: v
for k, v in account.items()
if k not in {"email", "mobile", "password", "token", "hif_dliq", "hif_leim"}
}
updated[identity_key] = desired_identity
updated["password"] = desired_account["password"]
merged_accounts.append(updated)
desired_accounts.pop(desired_identity, None)
else:
merged_accounts.append(account)
for remaining in desired_accounts.values():
merged_accounts.append(remaining)
merged["accounts"] = merged_accounts
return merged
def _sanitize_for_persistence(obj):
if isinstance(obj, dict):
sanitized = {}
for key, value in obj.items():
if key.startswith("_"):
continue
if key == "keys":
if os.getenv("DEEPSEEK_API_KEY", "").strip():
sanitized[key] = [KEYS_PLACEHOLDER]
continue
if key == "password":
if os.getenv("DEEPSEEK_PASSWORD", "").strip():
sanitized[key] = PASSWORD_PLACEHOLDER
continue
if key in SENSITIVE_ACCOUNT_FIELDS:
continue
if key in {"mobile", "email"} and isinstance(value, str):
for env_name in ACCOUNT_ENV_NAMES:
env_value = os.getenv(env_name, "").strip()
if env_value and value == env_value:
sanitized[key] = f"${env_name}"
break
else:
sanitized[key] = _sanitize_for_persistence(value)
continue
sanitized[key] = _sanitize_for_persistence(value)
return sanitized
if isinstance(obj, list):
return [_sanitize_for_persistence(item) for item in obj]
return obj
def load_config():
"""从 config.json 加载配置,出错则返回空 dict"""
try:
with open(CONFIG_PATH, "r", encoding="utf-8") as f:
return _merge_env_secrets(json.load(f))
except Exception as e:
logger.warning(f"[load_config] 无法读取配置文件: {e}")
return _merge_env_secrets({})
def save_config(cfg):
"""将配置写回 config.json,但不持久化密钥、密码或会话令牌。"""
try:
cfg_clean = _sanitize_for_persistence(cfg)
with open(CONFIG_PATH, "w", encoding="utf-8") as f:
json.dump(cfg_clean, f, ensure_ascii=False, indent=2)
except Exception as e:
logger.error(f"[save_config] 写入 config.json 失败: {e}")
CONFIG = load_config()