File size: 3,420 Bytes
b3f1931
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
"""Encrypted remote .env loader and publisher helpers."""

from __future__ import annotations

import base64
import io
import json
import logging
import os
from typing import Any

import requests
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from dotenv import dotenv_values


def _decode_maybe_b64(value: str) -> str:
    text = str(value or "").strip()
    if not text:
        return ""
    try:
        padded = text + "=" * (-len(text) % 4)
        decoded = base64.urlsafe_b64decode(padded.encode("utf-8")).decode("utf-8")
        if decoded:
            return decoded
    except Exception:
        pass
    return text


def _derive_key(password: str, salt: bytes) -> bytes:
    kdf = PBKDF2HMAC(
        algorithm=hashes.SHA256(),
        length=32,
        salt=salt,
        iterations=390000,
    )
    return base64.urlsafe_b64encode(kdf.derive(password.encode("utf-8")))


def build_remote_env_bundle(env_text: str, password: str, metadata: dict[str, Any] | None = None) -> dict[str, Any]:
    salt = os.urandom(16)
    key = _derive_key(password, salt)
    token = Fernet(key).encrypt((env_text or "").encode("utf-8"))
    return {
        "schema": "kapo-remote-env-v1",
        "salt": base64.urlsafe_b64encode(salt).decode("utf-8"),
        "token": token.decode("utf-8"),
        "metadata": dict(metadata or {}),
    }


def decrypt_remote_env_bundle(payload: dict[str, Any], password: str) -> str:
    salt_raw = str(payload.get("salt") or "").strip()
    token = str(payload.get("token") or "").strip()
    if not salt_raw or not token:
        raise ValueError("Remote env payload is incomplete")
    salt = base64.urlsafe_b64decode(salt_raw.encode("utf-8"))
    key = _derive_key(password, salt)
    return Fernet(key).decrypt(token.encode("utf-8")).decode("utf-8")


def _remote_env_url() -> str:
    return _decode_maybe_b64(
        str(os.getenv("KAPO_REMOTE_ENV_URL", "") or os.getenv("KAPO_REMOTE_ENV_URL_B64", "") or "").strip()
    )


def _remote_env_password() -> str:
    return _decode_maybe_b64(
        str(os.getenv("KAPO_REMOTE_ENV_PASSWORD", "") or os.getenv("KAPO_REMOTE_ENV_PASSWORD_B64", "") or "").strip()
    )


def load_remote_env_if_configured(*, override: bool = True, logger_name: str = "kapo.remote_env") -> dict[str, str]:
    logger = logging.getLogger(logger_name)
    if str(os.getenv("KAPO_REMOTE_ENV_LOADED", "")).strip().lower() in {"1", "true", "yes", "on"}:
        return {}
    url = _remote_env_url()
    password = _remote_env_password()
    if not url or not password:
        return {}
    try:
        response = requests.get(url, timeout=30)
        response.raise_for_status()
        payload = dict(response.json() or {})
        env_text = decrypt_remote_env_bundle(payload, password)
        parsed = {
            str(key): str(value)
            for key, value in dotenv_values(stream=io.StringIO(env_text)).items()
            if key and value is not None
        }
        for key, value in parsed.items():
            if override or key not in os.environ or not str(os.getenv(key) or "").strip():
                os.environ[key] = value
        os.environ["KAPO_REMOTE_ENV_LOADED"] = "1"
        return parsed
    except Exception:
        logger.warning("Failed to load encrypted remote env bundle", exc_info=True)
        return {}