| """Encrypted remote .env loader and publisher helpers.""" |
|
|
| from __future__ import annotations |
|
|
| import base64 |
| import io |
| import json |
| import logging |
| import os |
| from typing import Any |
|
|
| import requests |
| from cryptography.fernet import Fernet |
| from cryptography.hazmat.primitives import hashes |
| from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC |
| from dotenv import dotenv_values |
|
|
|
|
| def _decode_maybe_b64(value: str) -> str: |
| text = str(value or "").strip() |
| if not text: |
| return "" |
| try: |
| padded = text + "=" * (-len(text) % 4) |
| decoded = base64.urlsafe_b64decode(padded.encode("utf-8")).decode("utf-8") |
| if decoded: |
| return decoded |
| except Exception: |
| pass |
| return text |
|
|
|
|
| def _derive_key(password: str, salt: bytes) -> bytes: |
| kdf = PBKDF2HMAC( |
| algorithm=hashes.SHA256(), |
| length=32, |
| salt=salt, |
| iterations=390000, |
| ) |
| return base64.urlsafe_b64encode(kdf.derive(password.encode("utf-8"))) |
|
|
|
|
| def build_remote_env_bundle(env_text: str, password: str, metadata: dict[str, Any] | None = None) -> dict[str, Any]: |
| salt = os.urandom(16) |
| key = _derive_key(password, salt) |
| token = Fernet(key).encrypt((env_text or "").encode("utf-8")) |
| return { |
| "schema": "kapo-remote-env-v1", |
| "salt": base64.urlsafe_b64encode(salt).decode("utf-8"), |
| "token": token.decode("utf-8"), |
| "metadata": dict(metadata or {}), |
| } |
|
|
|
|
| def decrypt_remote_env_bundle(payload: dict[str, Any], password: str) -> str: |
| salt_raw = str(payload.get("salt") or "").strip() |
| token = str(payload.get("token") or "").strip() |
| if not salt_raw or not token: |
| raise ValueError("Remote env payload is incomplete") |
| salt = base64.urlsafe_b64decode(salt_raw.encode("utf-8")) |
| key = _derive_key(password, salt) |
| return Fernet(key).decrypt(token.encode("utf-8")).decode("utf-8") |
|
|
|
|
| def _remote_env_url() -> str: |
| return _decode_maybe_b64( |
| str(os.getenv("KAPO_REMOTE_ENV_URL", "") or os.getenv("KAPO_REMOTE_ENV_URL_B64", "") or "").strip() |
| ) |
|
|
|
|
| def _remote_env_password() -> str: |
| return _decode_maybe_b64( |
| str(os.getenv("KAPO_REMOTE_ENV_PASSWORD", "") or os.getenv("KAPO_REMOTE_ENV_PASSWORD_B64", "") or "").strip() |
| ) |
|
|
|
|
| def load_remote_env_if_configured(*, override: bool = True, logger_name: str = "kapo.remote_env") -> dict[str, str]: |
| logger = logging.getLogger(logger_name) |
| if str(os.getenv("KAPO_REMOTE_ENV_LOADED", "")).strip().lower() in {"1", "true", "yes", "on"}: |
| return {} |
| url = _remote_env_url() |
| password = _remote_env_password() |
| if not url or not password: |
| return {} |
| try: |
| response = requests.get(url, timeout=30) |
| response.raise_for_status() |
| payload = dict(response.json() or {}) |
| env_text = decrypt_remote_env_bundle(payload, password) |
| parsed = { |
| str(key): str(value) |
| for key, value in dotenv_values(stream=io.StringIO(env_text)).items() |
| if key and value is not None |
| } |
| for key, value in parsed.items(): |
| if override or key not in os.environ or not str(os.getenv(key) or "").strip(): |
| os.environ[key] = value |
| os.environ["KAPO_REMOTE_ENV_LOADED"] = "1" |
| return parsed |
| except Exception: |
| logger.warning("Failed to load encrypted remote env bundle", exc_info=True) |
| return {} |
|
|