Spaces:
Sleeping
Sleeping
| """ | |
| Encryption utilities using NaCl (libsodium) | |
| """ | |
| import base64 | |
| import gzip | |
| import json | |
| from nacl.secret import SecretBox | |
| from nacl.utils import random | |
| class CryptoManager: | |
| def __init__(self, secret_key_hex: str): | |
| """ | |
| Initialize with hex key string from .env | |
| Converts 64-character hex string to 32 bytes | |
| """ | |
| if not secret_key_hex: | |
| raise ValueError("Secret key is required") | |
| if len(secret_key_hex) == 64: | |
| self.secret_key = bytes.fromhex(secret_key_hex) | |
| elif len(secret_key_hex) == 32: | |
| print(f"⚠️ WARNING: Key is only 32 characters (16 bytes)") | |
| print(f" Should be 64 hex characters for 32 bytes") | |
| self.secret_key = secret_key_hex.encode('utf-8') | |
| else: | |
| raise ValueError(f"Secret key must be 64 hex characters (got {len(secret_key_hex)})") | |
| if len(self.secret_key) != 32: | |
| raise ValueError(f"Secret key must be 32 bytes (got {len(self.secret_key)} bytes)") | |
| self.box = SecretBox(self.secret_key) | |
| print(f"✓ CryptoManager initialized (key: {len(self.secret_key)} bytes)") | |
| def encrypt(self, plaintext: bytes, nonce: bytes = None) -> bytes: | |
| """Encrypt plaintext bytes""" | |
| if nonce is None: | |
| nonce = random(SecretBox.NONCE_SIZE) | |
| return self.box.encrypt(plaintext, nonce) | |
| def decrypt(self, ciphertext: str, nonce: str) -> bytes: | |
| """Decrypt base64-encoded ciphertext with base64-encoded nonce""" | |
| try: | |
| ciphertext_bytes = base64.b64decode(ciphertext) | |
| nonce_bytes = base64.b64decode(nonce) | |
| return self.box.decrypt(ciphertext_bytes, nonce_bytes) | |
| except Exception as e: | |
| raise ValueError(f"Decryption failed. {e}") | |
| def encrypt_json(self, data: dict) -> dict: | |
| """ | |
| Encrypt JSON data with compression | |
| Returns dict with base64-encoded ciphertext and nonce | |
| """ | |
| json_data = json.dumps(data).encode('utf-8') | |
| compressed = gzip.compress(json_data, compresslevel=6) | |
| compressed_b64 = base64.b64encode(compressed).decode('utf-8') | |
| nonce = random(SecretBox.NONCE_SIZE) | |
| ciphertext = self.box.encrypt(compressed_b64.encode('utf-8'), nonce) | |
| return { | |
| "ciphertext": base64.b64encode(ciphertext.ciphertext).decode('utf-8'), | |
| "nonce": base64.b64encode(nonce).decode('utf-8') | |
| } | |
| def decrypt_json(self, ciphertext: str, nonce: str) -> dict: | |
| """ | |
| Decrypt and decompress JSON data | |
| """ | |
| try: | |
| decrypted = self.decrypt(ciphertext, nonce) | |
| compressed_b64 = decrypted.decode('utf-8') | |
| compressed_bytes = base64.b64decode(compressed_b64) | |
| decompressed = gzip.decompress(compressed_bytes) | |
| return json.loads(decompressed.decode('utf-8')) | |
| except Exception as e: | |
| raise ValueError(f"Decryption/decompression failed. {e}") | |