Compost / backend /vault.py
abc1181's picture
feat: full platform with dynamic Composio sync, local DB cache, React UI
f9474bf
Raw
History Blame Contribute Delete
1.59 kB
import os
import base64
import json
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
class CredentialVault:
def __init__(self, master_key_path: str = "/data/vault/master.key"):
self.master_key_path = master_key_path
self._key = self._load_key()
def _load_key(self) -> bytes:
if os.path.exists(self.master_key_path):
with open(self.master_key_path, "r") as f:
return bytes.fromhex(f.read().strip())
return os.urandom(32)
def _derive_key(self) -> bytes:
hkdf = HKDF(
algorithm=hashes.SHA256(),
length=32,
salt=b"platform-vault",
info=b"credential-encryption"
)
return hkdf.derive(self._key)
def encrypt(self, plaintext: str) -> str:
aesgcm = AESGCM(self._derive_key())
nonce = os.urandom(12)
ciphertext = aesgcm.encrypt(nonce, plaintext.encode(), None)
return base64.b64encode(nonce + ciphertext).decode()
def decrypt(self, ciphertext_b64: str) -> str:
combined = base64.b64decode(ciphertext_b64)
aesgcm = AESGCM(self._derive_key())
return aesgcm.decrypt(combined[:12], combined[12:], None).decode()
def store_credentials(self, credentials: dict) -> str:
return self.encrypt(json.dumps(credentials))
def load_credentials(self, encrypted_blob: str) -> dict:
return json.loads(self.decrypt(encrypted_blob))
vault = CredentialVault()