File size: 6,049 Bytes
60d4850
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
"""
AES-256-GCM Encryption at Rest for HIPAA Compliance

Provides:
- AES-256-GCM authenticated encryption for PHI data (transcripts, audio, SOAP notes)
- Key derivation from a master secret using PBKDF2-HMAC-SHA256
- Unique nonce per encryption operation (12-byte random)
- Base64-encoded ciphertext for safe storage in text DB columns

Security properties:
- Confidentiality + integrity (GCM authentication tag)
- No nonce reuse (random 12 bytes per operation)
- Key stretching via PBKDF2 with configurable iterations
"""

import base64
import hashlib
import hmac
import os
import struct
import logging
from typing import Optional

from app.config import settings

logger = logging.getLogger(__name__)

# Format: VERSION(1) || NONCE(12) || TAG(16) || CIPHERTEXT(variable)
_VERSION = 1
_NONCE_SIZE = 12
_TAG_SIZE = 16
_KEY_SIZE = 32  # 256 bits


def _derive_key(master_secret: str, salt: Optional[bytes] = None) -> tuple[bytes, bytes]:
    """Derive a 256-bit encryption key from master secret using PBKDF2."""
    if salt is None:
        salt = os.urandom(16)
    key = hashlib.pbkdf2_hmac(
        "sha256",
        master_secret.encode("utf-8"),
        salt,
        iterations=settings.encryption_kdf_iterations,
        dklen=_KEY_SIZE,
    )
    return key, salt


def _get_encryption_key() -> bytes:
    """Get or derive the encryption key from settings.

    Uses a fixed salt derived from the secret itself for deterministic key derivation,
    so the same key is produced across restarts without storing the salt separately.
    """
    secret = settings.encryption_master_key
    if not secret or secret == "CHANGE_ME_IN_PRODUCTION":
        raise RuntimeError(
            "ENCRYPTION_MASTER_KEY must be set to a strong random secret for HIPAA encryption at rest."
        )
    # Deterministic salt from HMAC of the secret — same key every time
    fixed_salt = hmac.new(secret.encode(), b"hipaa-encryption-salt", hashlib.sha256).digest()[:16]
    key, _ = _derive_key(secret, salt=fixed_salt)
    return key


def encrypt_data(plaintext: str) -> str:
    """Encrypt a plaintext string using AES-256-GCM.

    Returns a base64-encoded string containing version, nonce, tag, and ciphertext.
    Returns the original string if encryption is disabled or plaintext is empty.
    """
    if not settings.encryption_at_rest_enabled:
        return plaintext
    if not plaintext:
        return plaintext

    try:
        from cryptography.hazmat.primitives.ciphers.aead import AESGCM
    except ImportError:
        logger.error("cryptography package required for encryption. Install with: pip install cryptography")
        raise RuntimeError("cryptography package is required for HIPAA encryption at rest")

    key = _get_encryption_key()
    nonce = os.urandom(_NONCE_SIZE)
    aesgcm = AESGCM(key)
    ciphertext = aesgcm.encrypt(nonce, plaintext.encode("utf-8"), None)

    # ciphertext from AESGCM includes the 16-byte tag appended
    # Pack: VERSION || NONCE || CIPHERTEXT_WITH_TAG
    packed = struct.pack("B", _VERSION) + nonce + ciphertext
    return base64.b64encode(packed).decode("ascii")


def decrypt_data(encoded: str) -> str:
    """Decrypt a base64-encoded AES-256-GCM ciphertext.

    Returns the original plaintext string.
    Returns the input unchanged if encryption is disabled or data is not encrypted.
    """
    if not settings.encryption_at_rest_enabled:
        return encoded
    if not encoded:
        return encoded

    # Check if data looks like base64-encoded encrypted data
    try:
        raw = base64.b64decode(encoded)
    except Exception:
        # Not base64 — return as-is (unencrypted legacy data)
        return encoded

    if len(raw) < 1 + _NONCE_SIZE + _TAG_SIZE:
        # Too short to be encrypted data — return as-is
        return encoded

    version = raw[0]
    if version != _VERSION:
        # Unknown version or unencrypted data — return as-is
        return encoded

    try:
        from cryptography.hazmat.primitives.ciphers.aead import AESGCM
    except ImportError:
        raise RuntimeError("cryptography package is required for HIPAA decryption")

    nonce = raw[1:1 + _NONCE_SIZE]
    ciphertext_with_tag = raw[1 + _NONCE_SIZE:]

    key = _get_encryption_key()
    aesgcm = AESGCM(key)
    plaintext = aesgcm.decrypt(nonce, ciphertext_with_tag, None)
    return plaintext.decode("utf-8")


def encrypt_bytes(data: bytes) -> bytes:
    """Encrypt raw bytes (e.g., audio files) using AES-256-GCM.

    Returns encrypted bytes with version, nonce, and tag prepended.
    """
    if not settings.encryption_at_rest_enabled:
        return data
    if not data:
        return data

    try:
        from cryptography.hazmat.primitives.ciphers.aead import AESGCM
    except ImportError:
        raise RuntimeError("cryptography package is required for HIPAA encryption at rest")

    key = _get_encryption_key()
    nonce = os.urandom(_NONCE_SIZE)
    aesgcm = AESGCM(key)
    ciphertext = aesgcm.encrypt(nonce, data, None)
    return struct.pack("B", _VERSION) + nonce + ciphertext


def decrypt_bytes(data: bytes) -> bytes:
    """Decrypt raw bytes encrypted with encrypt_bytes."""
    if not settings.encryption_at_rest_enabled:
        return data
    if not data or len(data) < 1 + _NONCE_SIZE + _TAG_SIZE:
        return data

    version = data[0]
    if version != _VERSION:
        return data

    try:
        from cryptography.hazmat.primitives.ciphers.aead import AESGCM
    except ImportError:
        raise RuntimeError("cryptography package is required for HIPAA decryption")

    nonce = data[1:1 + _NONCE_SIZE]
    ciphertext_with_tag = data[1 + _NONCE_SIZE:]

    key = _get_encryption_key()
    aesgcm = AESGCM(key)
    return aesgcm.decrypt(nonce, ciphertext_with_tag, None)


def is_encrypted(data: str) -> bool:
    """Check if a string appears to be encrypted data."""
    try:
        raw = base64.b64decode(data)
        return len(raw) > 1 + _NONCE_SIZE + _TAG_SIZE and raw[0] == _VERSION
    except Exception:
        return False