File size: 6,106 Bytes
f2c6053 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 | """
Encryption Utility for AegisLM
Provides AES-256 encryption for sensitive fields at rest.
"""
import base64
import hashlib
import os
from typing import Optional, Tuple
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import padding
from security.secret_manager import get_encryption_key
class EncryptionService:
"""
AES-256 encryption service for sensitive data at rest.
Uses AES-256-CBC with PKCS7 padding.
"""
_instance: Optional["EncryptionService"] = None
def __new__(cls) -> "EncryptionService":
"""Singleton pattern for encryption service."""
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
"""Initialize the encryption service."""
if not hasattr(self, "_initialized"):
self._key = self._derive_key(get_encryption_key())
self._initialized = True
def _derive_key(self, key_input: str) -> bytes:
"""
Derive a 256-bit key from the input key.
Args:
key_input: Input key string
Returns:
32-byte derived key
"""
# Use SHA-256 to derive a 32-byte key from the input
return hashlib.sha256(key_input.encode()).digest()
def _pad(self, data: bytes) -> bytes:
"""
Pad data using PKCS7 padding.
Args:
data: Data to pad
Returns:
Padded data
"""
padder = padding.PKCS7(algorithms.AES.block_size).padder()
return padder.update(data) + padder.finalize()
def _unpad(self, data: bytes) -> bytes:
"""
Remove PKCS7 padding.
Args:
data: Padded data
Returns:
Unpadded data
"""
unpadder = padding.PKCS7(algorithms.AES.block_size).unpadder()
return unpadder.update(data) + unpadder.finalize()
def encrypt(self, plaintext: str) -> str:
"""
Encrypt plaintext using AES-256-CBC.
Args:
plaintext: Plain text to encrypt
Returns:
Base64-encoded ciphertext (IV + ciphertext)
"""
# Generate random IV
iv = os.urandom(16)
# Create cipher
cipher = Cipher(
algorithms.AES(self._key),
modes.CBC(iv),
backend=default_backend()
)
encryptor = cipher.encryptor()
# Pad and encrypt
padded_data = self._pad(plaintext.encode("utf-8"))
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
# Combine IV and ciphertext
combined = iv + ciphertext
# Return base64-encoded result
return base64.b64encode(combined).decode("utf-8")
def decrypt(self, ciphertext: str) -> str:
"""
Decrypt ciphertext using AES-256-CBC.
Args:
ciphertext: Base64-encoded ciphertext (IV + ciphertext)
Returns:
Decrypted plaintext
"""
# Decode from base64
combined = base64.b64decode(ciphertext)
# Extract IV and ciphertext
iv = combined[:16]
ciphertext = combined[16:]
# Create cipher
cipher = Cipher(
algorithms.AES(self._key),
modes.CBC(iv),
backend=default_backend()
)
decryptor = cipher.decryptor()
# Decrypt and unpad
padded_data = decryptor.update(ciphertext) + decryptor.finalize()
plaintext = self._unpad(padded_data)
return plaintext.decode("utf-8")
def encrypt_bytes(self, data: bytes) -> str:
"""
Encrypt bytes data.
Args:
data: Bytes to encrypt
Returns:
Base64-encoded ciphertext
"""
return self.encrypt(base64.b64encode(data).decode("utf-8"))
def decrypt_bytes(self, ciphertext: str) -> bytes:
"""
Decrypt to bytes.
Args:
ciphertext: Base64-encoded ciphertext
Returns:
Decrypted bytes
"""
plaintext = self.decrypt(ciphertext)
return base64.b64decode(plaintext)
def get_encryption_service() -> EncryptionService:
"""
Get the singleton encryption service instance.
Returns:
EncryptionService instance
"""
return EncryptionService()
# Convenience functions
def encrypt(plaintext: str) -> str:
"""Encrypt plaintext."""
return get_encryption_service().encrypt(plaintext)
def decrypt(ciphertext: str) -> str:
"""Decrypt ciphertext."""
return get_encryption_service().decrypt(ciphertext)
# =============================================================================
# Password hashing (using bcrypt)
# =============================================================================
def hash_password(password: str) -> str:
"""
Hash a password using bcrypt.
Args:
password: Plain text password
Returns:
bcrypt hash
"""
import bcrypt
return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")
def verify_password(password: str, hashed: str) -> bool:
"""
Verify a password against a bcrypt hash.
Args:
password: Plain text password
hashed: bcrypt hash
Returns:
True if password matches
"""
import bcrypt
try:
return bcrypt.checkpw(password.encode("utf-8"), hashed.encode("utf-8"))
except Exception:
return False
|