import json import os import subprocess import sys from pathlib import Path SENSITIVE_FIELDS = {"keys", "password", "token", "hif_dliq", "hif_leim"} KEYS_PLACEHOLDER = "$DEEPSEEK_API_KEY" PASSWORD_PLACEHOLDER = "$DEEPSEEK_PASSWORD" ACCOUNT_ENV_NAMES = ["DEEPSEEK_ACCOUNT", "DEEPSEEK_ACCOUNT2", "DEEPSEEK_ACCOUNT3", "DEEPSEEK_ACCOUNT4"] def _sanitize_for_persistence(obj): if isinstance(obj, dict): sanitized = {} for key, value in obj.items(): if key.startswith("_"): continue if key == "keys": if os.getenv("DEEPSEEK_API_KEY", "").strip(): sanitized[key] = [KEYS_PLACEHOLDER] continue if key == "password": if os.getenv("DEEPSEEK_PASSWORD", "").strip(): sanitized[key] = PASSWORD_PLACEHOLDER continue if key in SENSITIVE_FIELDS: continue if key in {"mobile", "email"} and isinstance(value, str): for env_name in ACCOUNT_ENV_NAMES: env_value = os.getenv(env_name, "").strip() if env_value and value == env_value: sanitized[key] = f"${env_name}" break else: sanitized[key] = _sanitize_for_persistence(value) continue sanitized[key] = _sanitize_for_persistence(value) return sanitized if isinstance(obj, list): return [_sanitize_for_persistence(item) for item in obj] return obj def prepare_runtime_files() -> None: data_dir = Path(os.getenv("HF_BUCKET_PATH", "/data")) data_dir.mkdir(parents=True, exist_ok=True) config_path = Path(os.getenv("CONFIG_PATH", data_dir / "config.json")) db_path = Path(os.getenv("DATA_DB_PATH", data_dir / "data.db")) config_path.parent.mkdir(parents=True, exist_ok=True) db_path.parent.mkdir(parents=True, exist_ok=True) config = {} if config_path.exists(): try: config = json.loads(config_path.read_text(encoding="utf-8")) except Exception: config = {} if os.getenv("DEEPSEEK_API_KEY", "").strip(): config.setdefault("keys", []) if os.getenv("DEEPSEEK_PASSWORD", "").strip(): placeholder_accounts = [] for env_name in ACCOUNT_ENV_NAMES: if os.getenv(env_name, "").strip(): placeholder_accounts.append({"mobile": f"${env_name}", "password": PASSWORD_PLACEHOLDER}) if placeholder_accounts: accounts = config.get("accounts") if not isinstance(accounts, list) or not accounts: config["accounts"] = placeholder_accounts config_path.write_text( json.dumps(_sanitize_for_persistence(config), ensure_ascii=False, indent=2), encoding="utf-8", ) db_path.touch(exist_ok=True) if __name__ == "__main__": prepare_runtime_files() if os.getenv("PREPARE_ONLY", "").lower() in {"1", "true", "yes"}: sys.exit(0) subprocess.run([sys.executable, "app_visualization.py"], check=True)