| import os |
| import base64 |
| import json |
| from cryptography.hazmat.primitives.ciphers.aead import AESGCM |
| from cryptography.hazmat.primitives import hashes |
| from cryptography.hazmat.primitives.kdf.hkdf import HKDF |
|
|
|
|
| class CredentialVault: |
| def __init__(self, master_key_path: str = "/data/vault/master.key"): |
| self.master_key_path = master_key_path |
| self._key = self._load_key() |
|
|
| def _load_key(self) -> bytes: |
| if os.path.exists(self.master_key_path): |
| with open(self.master_key_path, "r") as f: |
| return bytes.fromhex(f.read().strip()) |
| return os.urandom(32) |
|
|
| def _derive_key(self) -> bytes: |
| hkdf = HKDF( |
| algorithm=hashes.SHA256(), |
| length=32, |
| salt=b"platform-vault", |
| info=b"credential-encryption" |
| ) |
| return hkdf.derive(self._key) |
|
|
| def encrypt(self, plaintext: str) -> str: |
| aesgcm = AESGCM(self._derive_key()) |
| nonce = os.urandom(12) |
| ciphertext = aesgcm.encrypt(nonce, plaintext.encode(), None) |
| return base64.b64encode(nonce + ciphertext).decode() |
|
|
| def decrypt(self, ciphertext_b64: str) -> str: |
| combined = base64.b64decode(ciphertext_b64) |
| aesgcm = AESGCM(self._derive_key()) |
| return aesgcm.decrypt(combined[:12], combined[12:], None).decode() |
|
|
| def store_credentials(self, credentials: dict) -> str: |
| return self.encrypt(json.dumps(credentials)) |
|
|
| def load_credentials(self, encrypted_blob: str) -> dict: |
| return json.loads(self.decrypt(encrypted_blob)) |
|
|
|
|
| vault = CredentialVault() |