File size: 5,238 Bytes
33f27ba
 
 
 
 
 
 
db95bcf
46e1036
 
624fe7f
db95bcf
 
d933d48
 
 
 
 
db95bcf
 
 
 
 
 
 
 
d933d48
 
 
 
 
 
 
 
 
 
 
 
 
 
 
db95bcf
 
 
 
 
 
 
d933d48
 
db95bcf
 
 
d933d48
 
 
 
 
db95bcf
 
 
 
 
d933d48
 
 
 
 
 
 
 
bc91c2e
 
 
 
 
db95bcf
d933d48
db95bcf
d933d48
db95bcf
 
d933d48
 
db95bcf
 
 
 
 
 
 
 
 
 
 
 
46e1036
 
 
 
 
 
db95bcf
 
 
d933d48
 
 
 
 
 
 
 
46e1036
db95bcf
 
 
 
 
33f27ba
 
 
 
 
 
db95bcf
33f27ba
 
db95bcf
33f27ba
 
 
db95bcf
33f27ba
db95bcf
33f27ba
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
import json
import logging
import os

logger = logging.getLogger(__name__)

CONFIG_PATH = os.getenv("CONFIG_PATH", "config.json")
SENSITIVE_ACCOUNT_FIELDS = {"password", "token", "hif_dliq", "hif_leim"}
KEYS_PLACEHOLDER = "$DEEPSEEK_API_KEY"
PASSWORD_PLACEHOLDER = "$DEEPSEEK_PASSWORD"
ACCOUNT_ENV_NAMES = ["DEEPSEEK_ACCOUNT", "DEEPSEEK_ACCOUNT2", "DEEPSEEK_ACCOUNT3", "DEEPSEEK_ACCOUNT4"]


def _account_placeholders() -> set[str]:
    return {f"${name}" for name in ACCOUNT_ENV_NAMES}


def _account_from_value(account: str, password: str) -> dict:
    env_account = {"password": password, "token": ""}
    if "@" in account:
        env_account["email"] = account
    else:
        env_account["mobile"] = account
    return env_account


def _accounts_from_env() -> list[dict]:
    password = os.getenv("DEEPSEEK_PASSWORD", "").strip()
    if not password:
        return []
    accounts = []
    seen = set()
    for env_name in ACCOUNT_ENV_NAMES:
        account = os.getenv(env_name, "").strip()
        if not account or account in seen:
            continue
        seen.add(account)
        accounts.append(_account_from_value(account, password))
    return accounts


def _merge_env_secrets(config: dict) -> dict:
    merged = dict(config) if isinstance(config, dict) else {}

    api_key = os.getenv("DEEPSEEK_API_KEY", "").strip()
    if api_key:
        merged["keys"] = [api_key]

    env_accounts = _accounts_from_env()
    if env_accounts:
        accounts = merged.get("accounts")
        if not isinstance(accounts, list):
            accounts = []
        placeholders = _account_placeholders()
        desired_accounts = {
            (account.get("email") or account.get("mobile")): account
            for account in env_accounts
        }
        merged_accounts = []
        for account in accounts:
            if not isinstance(account, dict):
                continue
            current_identity = account.get("email") or account.get("mobile")
            if current_identity in desired_accounts or current_identity in placeholders:
                desired_account = desired_accounts.get(current_identity)
                if desired_account is None and len(desired_accounts) == 1:
                    desired_account = next(iter(desired_accounts.values()))
                if desired_account is None:
                    continue
                desired_identity = desired_account.get("email") or desired_account.get("mobile")
                identity_key = "email" if "email" in desired_account else "mobile"
                updated = {
                    k: v
                    for k, v in account.items()
                    if k not in {"email", "mobile", "password", "token", "hif_dliq", "hif_leim"}
                }
                updated[identity_key] = desired_identity
                updated["password"] = desired_account["password"]
                merged_accounts.append(updated)
                desired_accounts.pop(desired_identity, None)
            else:
                merged_accounts.append(account)
        for remaining in desired_accounts.values():
            merged_accounts.append(remaining)
        merged["accounts"] = merged_accounts

    return merged


def _sanitize_for_persistence(obj):
    if isinstance(obj, dict):
        sanitized = {}
        for key, value in obj.items():
            if key.startswith("_"):
                continue
            if key == "keys":
                if os.getenv("DEEPSEEK_API_KEY", "").strip():
                    sanitized[key] = [KEYS_PLACEHOLDER]
                continue
            if key == "password":
                if os.getenv("DEEPSEEK_PASSWORD", "").strip():
                    sanitized[key] = PASSWORD_PLACEHOLDER
                continue
            if key in SENSITIVE_ACCOUNT_FIELDS:
                continue
            if key in {"mobile", "email"} and isinstance(value, str):
                for env_name in ACCOUNT_ENV_NAMES:
                    env_value = os.getenv(env_name, "").strip()
                    if env_value and value == env_value:
                        sanitized[key] = f"${env_name}"
                        break
                else:
                    sanitized[key] = _sanitize_for_persistence(value)
                continue
            sanitized[key] = _sanitize_for_persistence(value)
        return sanitized
    if isinstance(obj, list):
        return [_sanitize_for_persistence(item) for item in obj]
    return obj


def load_config():
    """从 config.json 加载配置,出错则返回空 dict"""
    try:
        with open(CONFIG_PATH, "r", encoding="utf-8") as f:
            return _merge_env_secrets(json.load(f))
    except Exception as e:
        logger.warning(f"[load_config] 无法读取配置文件: {e}")
        return _merge_env_secrets({})


def save_config(cfg):
    """将配置写回 config.json,但不持久化密钥、密码或会话令牌。"""
    try:
        cfg_clean = _sanitize_for_persistence(cfg)
        with open(CONFIG_PATH, "w", encoding="utf-8") as f:
            json.dump(cfg_clean, f, ensure_ascii=False, indent=2)
    except Exception as e:
        logger.error(f"[save_config] 写入 config.json 失败: {e}")


CONFIG = load_config()