AiCoder / shared /remote_env.py
MrA7A1's picture
Sync modernized KAPO runtime from control plane
b3f1931 verified
"""Encrypted remote .env loader and publisher helpers."""
from __future__ import annotations
import base64
import io
import json
import logging
import os
from typing import Any
import requests
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from dotenv import dotenv_values
def _decode_maybe_b64(value: str) -> str:
text = str(value or "").strip()
if not text:
return ""
try:
padded = text + "=" * (-len(text) % 4)
decoded = base64.urlsafe_b64decode(padded.encode("utf-8")).decode("utf-8")
if decoded:
return decoded
except Exception:
pass
return text
def _derive_key(password: str, salt: bytes) -> bytes:
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=390000,
)
return base64.urlsafe_b64encode(kdf.derive(password.encode("utf-8")))
def build_remote_env_bundle(env_text: str, password: str, metadata: dict[str, Any] | None = None) -> dict[str, Any]:
salt = os.urandom(16)
key = _derive_key(password, salt)
token = Fernet(key).encrypt((env_text or "").encode("utf-8"))
return {
"schema": "kapo-remote-env-v1",
"salt": base64.urlsafe_b64encode(salt).decode("utf-8"),
"token": token.decode("utf-8"),
"metadata": dict(metadata or {}),
}
def decrypt_remote_env_bundle(payload: dict[str, Any], password: str) -> str:
salt_raw = str(payload.get("salt") or "").strip()
token = str(payload.get("token") or "").strip()
if not salt_raw or not token:
raise ValueError("Remote env payload is incomplete")
salt = base64.urlsafe_b64decode(salt_raw.encode("utf-8"))
key = _derive_key(password, salt)
return Fernet(key).decrypt(token.encode("utf-8")).decode("utf-8")
def _remote_env_url() -> str:
return _decode_maybe_b64(
str(os.getenv("KAPO_REMOTE_ENV_URL", "") or os.getenv("KAPO_REMOTE_ENV_URL_B64", "") or "").strip()
)
def _remote_env_password() -> str:
return _decode_maybe_b64(
str(os.getenv("KAPO_REMOTE_ENV_PASSWORD", "") or os.getenv("KAPO_REMOTE_ENV_PASSWORD_B64", "") or "").strip()
)
def load_remote_env_if_configured(*, override: bool = True, logger_name: str = "kapo.remote_env") -> dict[str, str]:
logger = logging.getLogger(logger_name)
if str(os.getenv("KAPO_REMOTE_ENV_LOADED", "")).strip().lower() in {"1", "true", "yes", "on"}:
return {}
url = _remote_env_url()
password = _remote_env_password()
if not url or not password:
return {}
try:
response = requests.get(url, timeout=30)
response.raise_for_status()
payload = dict(response.json() or {})
env_text = decrypt_remote_env_bundle(payload, password)
parsed = {
str(key): str(value)
for key, value in dotenv_values(stream=io.StringIO(env_text)).items()
if key and value is not None
}
for key, value in parsed.items():
if override or key not in os.environ or not str(os.getenv(key) or "").strip():
os.environ[key] = value
os.environ["KAPO_REMOTE_ENV_LOADED"] = "1"
return parsed
except Exception:
logger.warning("Failed to load encrypted remote env bundle", exc_info=True)
return {}