import os import base64 import json from cryptography.hazmat.primitives.ciphers.aead import AESGCM from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.kdf.hkdf import HKDF class CredentialVault: def __init__(self, master_key_path: str = "/data/vault/master.key"): self.master_key_path = master_key_path self._key = self._load_key() def _load_key(self) -> bytes: if os.path.exists(self.master_key_path): with open(self.master_key_path, "r") as f: return bytes.fromhex(f.read().strip()) return os.urandom(32) def _derive_key(self) -> bytes: hkdf = HKDF( algorithm=hashes.SHA256(), length=32, salt=b"platform-vault", info=b"credential-encryption" ) return hkdf.derive(self._key) def encrypt(self, plaintext: str) -> str: aesgcm = AESGCM(self._derive_key()) nonce = os.urandom(12) ciphertext = aesgcm.encrypt(nonce, plaintext.encode(), None) return base64.b64encode(nonce + ciphertext).decode() def decrypt(self, ciphertext_b64: str) -> str: combined = base64.b64decode(ciphertext_b64) aesgcm = AESGCM(self._derive_key()) return aesgcm.decrypt(combined[:12], combined[12:], None).decode() def store_credentials(self, credentials: dict) -> str: return self.encrypt(json.dumps(credentials)) def load_credentials(self, encrypted_blob: str) -> dict: return json.loads(self.decrypt(encrypted_blob)) vault = CredentialVault()