| import json |
| import os |
| import subprocess |
| import sys |
| from pathlib import Path |
|
|
|
|
| SENSITIVE_FIELDS = {"keys", "password", "token", "hif_dliq", "hif_leim"} |
| KEYS_PLACEHOLDER = "$DEEPSEEK_API_KEY" |
| PASSWORD_PLACEHOLDER = "$DEEPSEEK_PASSWORD" |
| ACCOUNT_ENV_NAMES = ["DEEPSEEK_ACCOUNT", "DEEPSEEK_ACCOUNT2", "DEEPSEEK_ACCOUNT3", "DEEPSEEK_ACCOUNT4"] |
|
|
|
|
| def _sanitize_for_persistence(obj): |
| if isinstance(obj, dict): |
| sanitized = {} |
| for key, value in obj.items(): |
| if key.startswith("_"): |
| continue |
| if key == "keys": |
| if os.getenv("DEEPSEEK_API_KEY", "").strip(): |
| sanitized[key] = [KEYS_PLACEHOLDER] |
| continue |
| if key == "password": |
| if os.getenv("DEEPSEEK_PASSWORD", "").strip(): |
| sanitized[key] = PASSWORD_PLACEHOLDER |
| continue |
| if key in SENSITIVE_FIELDS: |
| continue |
| if key in {"mobile", "email"} and isinstance(value, str): |
| for env_name in ACCOUNT_ENV_NAMES: |
| env_value = os.getenv(env_name, "").strip() |
| if env_value and value == env_value: |
| sanitized[key] = f"${env_name}" |
| break |
| else: |
| sanitized[key] = _sanitize_for_persistence(value) |
| continue |
| sanitized[key] = _sanitize_for_persistence(value) |
| return sanitized |
| if isinstance(obj, list): |
| return [_sanitize_for_persistence(item) for item in obj] |
| return obj |
|
|
|
|
| def prepare_runtime_files() -> None: |
| data_dir = Path(os.getenv("HF_BUCKET_PATH", "/data")) |
| data_dir.mkdir(parents=True, exist_ok=True) |
|
|
| config_path = Path(os.getenv("CONFIG_PATH", data_dir / "config.json")) |
| db_path = Path(os.getenv("DATA_DB_PATH", data_dir / "data.db")) |
| config_path.parent.mkdir(parents=True, exist_ok=True) |
| db_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
| config = {} |
| if config_path.exists(): |
| try: |
| config = json.loads(config_path.read_text(encoding="utf-8")) |
| except Exception: |
| config = {} |
|
|
| if os.getenv("DEEPSEEK_API_KEY", "").strip(): |
| config.setdefault("keys", []) |
| if os.getenv("DEEPSEEK_PASSWORD", "").strip(): |
| placeholder_accounts = [] |
| for env_name in ACCOUNT_ENV_NAMES: |
| if os.getenv(env_name, "").strip(): |
| placeholder_accounts.append({"mobile": f"${env_name}", "password": PASSWORD_PLACEHOLDER}) |
| if placeholder_accounts: |
| accounts = config.get("accounts") |
| if not isinstance(accounts, list) or not accounts: |
| config["accounts"] = placeholder_accounts |
|
|
| config_path.write_text( |
| json.dumps(_sanitize_for_persistence(config), ensure_ascii=False, indent=2), |
| encoding="utf-8", |
| ) |
| db_path.touch(exist_ok=True) |
|
|
|
|
| if __name__ == "__main__": |
| prepare_runtime_files() |
| if os.getenv("PREPARE_ONLY", "").lower() in {"1", "true", "yes"}: |
| sys.exit(0) |
| subprocess.run([sys.executable, "app_visualization.py"], check=True) |
|
|