Spaces:
Running
Running
| """ | |
| AES-256-GCM Encryption at Rest for HIPAA Compliance | |
| Provides: | |
| - AES-256-GCM authenticated encryption for PHI data (transcripts, audio, SOAP notes) | |
| - Key derivation from a master secret using PBKDF2-HMAC-SHA256 | |
| - Unique nonce per encryption operation (12-byte random) | |
| - Base64-encoded ciphertext for safe storage in text DB columns | |
| Security properties: | |
| - Confidentiality + integrity (GCM authentication tag) | |
| - No nonce reuse (random 12 bytes per operation) | |
| - Key stretching via PBKDF2 with configurable iterations | |
| """ | |
| import base64 | |
| import hashlib | |
| import hmac | |
| import os | |
| import struct | |
| import logging | |
| from typing import Optional | |
| from app.config import settings | |
| logger = logging.getLogger(__name__) | |
| # Format: VERSION(1) || NONCE(12) || TAG(16) || CIPHERTEXT(variable) | |
| _VERSION = 1 | |
| _NONCE_SIZE = 12 | |
| _TAG_SIZE = 16 | |
| _KEY_SIZE = 32 # 256 bits | |
| def _derive_key(master_secret: str, salt: Optional[bytes] = None) -> tuple[bytes, bytes]: | |
| """Derive a 256-bit encryption key from master secret using PBKDF2.""" | |
| if salt is None: | |
| salt = os.urandom(16) | |
| key = hashlib.pbkdf2_hmac( | |
| "sha256", | |
| master_secret.encode("utf-8"), | |
| salt, | |
| iterations=settings.encryption_kdf_iterations, | |
| dklen=_KEY_SIZE, | |
| ) | |
| return key, salt | |
| def _get_encryption_key() -> bytes: | |
| """Get or derive the encryption key from settings. | |
| Uses a fixed salt derived from the secret itself for deterministic key derivation, | |
| so the same key is produced across restarts without storing the salt separately. | |
| """ | |
| secret = settings.encryption_master_key | |
| if not secret or secret == "CHANGE_ME_IN_PRODUCTION": | |
| raise RuntimeError( | |
| "ENCRYPTION_MASTER_KEY must be set to a strong random secret for HIPAA encryption at rest." | |
| ) | |
| # Deterministic salt from HMAC of the secret — same key every time | |
| fixed_salt = hmac.new(secret.encode(), b"hipaa-encryption-salt", hashlib.sha256).digest()[:16] | |
| key, _ = _derive_key(secret, salt=fixed_salt) | |
| return key | |
| def encrypt_data(plaintext: str) -> str: | |
| """Encrypt a plaintext string using AES-256-GCM. | |
| Returns a base64-encoded string containing version, nonce, tag, and ciphertext. | |
| Returns the original string if encryption is disabled or plaintext is empty. | |
| """ | |
| if not settings.encryption_at_rest_enabled: | |
| return plaintext | |
| if not plaintext: | |
| return plaintext | |
| try: | |
| from cryptography.hazmat.primitives.ciphers.aead import AESGCM | |
| except ImportError: | |
| logger.error("cryptography package required for encryption. Install with: pip install cryptography") | |
| raise RuntimeError("cryptography package is required for HIPAA encryption at rest") | |
| key = _get_encryption_key() | |
| nonce = os.urandom(_NONCE_SIZE) | |
| aesgcm = AESGCM(key) | |
| ciphertext = aesgcm.encrypt(nonce, plaintext.encode("utf-8"), None) | |
| # ciphertext from AESGCM includes the 16-byte tag appended | |
| # Pack: VERSION || NONCE || CIPHERTEXT_WITH_TAG | |
| packed = struct.pack("B", _VERSION) + nonce + ciphertext | |
| return base64.b64encode(packed).decode("ascii") | |
| def decrypt_data(encoded: str) -> str: | |
| """Decrypt a base64-encoded AES-256-GCM ciphertext. | |
| Returns the original plaintext string. | |
| Returns the input unchanged if encryption is disabled or data is not encrypted. | |
| """ | |
| if not settings.encryption_at_rest_enabled: | |
| return encoded | |
| if not encoded: | |
| return encoded | |
| # Check if data looks like base64-encoded encrypted data | |
| try: | |
| raw = base64.b64decode(encoded) | |
| except Exception: | |
| # Not base64 — return as-is (unencrypted legacy data) | |
| return encoded | |
| if len(raw) < 1 + _NONCE_SIZE + _TAG_SIZE: | |
| # Too short to be encrypted data — return as-is | |
| return encoded | |
| version = raw[0] | |
| if version != _VERSION: | |
| # Unknown version or unencrypted data — return as-is | |
| return encoded | |
| try: | |
| from cryptography.hazmat.primitives.ciphers.aead import AESGCM | |
| except ImportError: | |
| raise RuntimeError("cryptography package is required for HIPAA decryption") | |
| nonce = raw[1:1 + _NONCE_SIZE] | |
| ciphertext_with_tag = raw[1 + _NONCE_SIZE:] | |
| key = _get_encryption_key() | |
| aesgcm = AESGCM(key) | |
| plaintext = aesgcm.decrypt(nonce, ciphertext_with_tag, None) | |
| return plaintext.decode("utf-8") | |
| def encrypt_bytes(data: bytes) -> bytes: | |
| """Encrypt raw bytes (e.g., audio files) using AES-256-GCM. | |
| Returns encrypted bytes with version, nonce, and tag prepended. | |
| """ | |
| if not settings.encryption_at_rest_enabled: | |
| return data | |
| if not data: | |
| return data | |
| try: | |
| from cryptography.hazmat.primitives.ciphers.aead import AESGCM | |
| except ImportError: | |
| raise RuntimeError("cryptography package is required for HIPAA encryption at rest") | |
| key = _get_encryption_key() | |
| nonce = os.urandom(_NONCE_SIZE) | |
| aesgcm = AESGCM(key) | |
| ciphertext = aesgcm.encrypt(nonce, data, None) | |
| return struct.pack("B", _VERSION) + nonce + ciphertext | |
| def decrypt_bytes(data: bytes) -> bytes: | |
| """Decrypt raw bytes encrypted with encrypt_bytes.""" | |
| if not settings.encryption_at_rest_enabled: | |
| return data | |
| if not data or len(data) < 1 + _NONCE_SIZE + _TAG_SIZE: | |
| return data | |
| version = data[0] | |
| if version != _VERSION: | |
| return data | |
| try: | |
| from cryptography.hazmat.primitives.ciphers.aead import AESGCM | |
| except ImportError: | |
| raise RuntimeError("cryptography package is required for HIPAA decryption") | |
| nonce = data[1:1 + _NONCE_SIZE] | |
| ciphertext_with_tag = data[1 + _NONCE_SIZE:] | |
| key = _get_encryption_key() | |
| aesgcm = AESGCM(key) | |
| return aesgcm.decrypt(nonce, ciphertext_with_tag, None) | |
| def is_encrypted(data: str) -> bool: | |
| """Check if a string appears to be encrypted data.""" | |
| try: | |
| raw = base64.b64decode(data) | |
| return len(raw) > 1 + _NONCE_SIZE + _TAG_SIZE and raw[0] == _VERSION | |
| except Exception: | |
| return False | |