"""Encrypted remote .env loader and publisher helpers.""" from __future__ import annotations import base64 import io import json import logging import os from typing import Any import requests from cryptography.fernet import Fernet from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC from dotenv import dotenv_values def _decode_maybe_b64(value: str) -> str: text = str(value or "").strip() if not text: return "" try: padded = text + "=" * (-len(text) % 4) decoded = base64.urlsafe_b64decode(padded.encode("utf-8")).decode("utf-8") if decoded: return decoded except Exception: pass return text def _derive_key(password: str, salt: bytes) -> bytes: kdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32, salt=salt, iterations=390000, ) return base64.urlsafe_b64encode(kdf.derive(password.encode("utf-8"))) def build_remote_env_bundle(env_text: str, password: str, metadata: dict[str, Any] | None = None) -> dict[str, Any]: salt = os.urandom(16) key = _derive_key(password, salt) token = Fernet(key).encrypt((env_text or "").encode("utf-8")) return { "schema": "kapo-remote-env-v1", "salt": base64.urlsafe_b64encode(salt).decode("utf-8"), "token": token.decode("utf-8"), "metadata": dict(metadata or {}), } def decrypt_remote_env_bundle(payload: dict[str, Any], password: str) -> str: salt_raw = str(payload.get("salt") or "").strip() token = str(payload.get("token") or "").strip() if not salt_raw or not token: raise ValueError("Remote env payload is incomplete") salt = base64.urlsafe_b64decode(salt_raw.encode("utf-8")) key = _derive_key(password, salt) return Fernet(key).decrypt(token.encode("utf-8")).decode("utf-8") def _remote_env_url() -> str: return _decode_maybe_b64( str(os.getenv("KAPO_REMOTE_ENV_URL", "") or os.getenv("KAPO_REMOTE_ENV_URL_B64", "") or "").strip() ) def _remote_env_password() -> str: return _decode_maybe_b64( str(os.getenv("KAPO_REMOTE_ENV_PASSWORD", "") or os.getenv("KAPO_REMOTE_ENV_PASSWORD_B64", "") or "").strip() ) def load_remote_env_if_configured(*, override: bool = True, logger_name: str = "kapo.remote_env") -> dict[str, str]: logger = logging.getLogger(logger_name) if str(os.getenv("KAPO_REMOTE_ENV_LOADED", "")).strip().lower() in {"1", "true", "yes", "on"}: return {} url = _remote_env_url() password = _remote_env_password() if not url or not password: return {} try: response = requests.get(url, timeout=30) response.raise_for_status() payload = dict(response.json() or {}) env_text = decrypt_remote_env_bundle(payload, password) parsed = { str(key): str(value) for key, value in dotenv_values(stream=io.StringIO(env_text)).items() if key and value is not None } for key, value in parsed.items(): if override or key not in os.environ or not str(os.getenv(key) or "").strip(): os.environ[key] = value os.environ["KAPO_REMOTE_ENV_LOADED"] = "1" return parsed except Exception: logger.warning("Failed to load encrypted remote env bundle", exc_info=True) return {}