corpusdb / app /encryption.py
mrsavage1's picture
Upload 62 files
ea520d9 verified
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC as PBKDF2
import base64
import os
from app.config import settings
class EncryptionManager:
"""Encrypt sensitive data at rest"""
def __init__(self):
self._cipher = None
self._key = None
def _get_key(self) -> bytes:
"""Derive encryption key from JWT secret"""
if self._key is not None:
return self._key
# Use JWT secret as password
password = settings.jwt_secret.encode()
salt = os.urandom(16)
# Derive key using PBKDF2
kdf = PBKDF2(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=600000,
)
key = base64.urlsafe_b64encode(kdf.derive(password))
self._key = key
return key
def _get_cipher(self) -> Fernet:
"""Get Fernet cipher instance"""
if not self._cipher:
self._cipher = Fernet(self._get_key())
return self._cipher
def encrypt(self, data: str) -> str:
"""Encrypt string data"""
if not data:
return data
cipher = self._get_cipher()
encrypted = cipher.encrypt(data.encode())
return base64.urlsafe_b64encode(encrypted).decode()
def decrypt(self, encrypted_data: str) -> str:
"""Decrypt string data"""
if not encrypted_data:
return encrypted_data
try:
cipher = self._get_cipher()
decoded = base64.urlsafe_b64decode(encrypted_data.encode())
decrypted = cipher.decrypt(decoded)
return decrypted.decode()
except Exception as e:
raise ValueError(f"Decryption failed: {e}")
def encrypt_dict(self, data: dict, fields: list) -> dict:
"""Encrypt specific fields in a dictionary"""
encrypted = data.copy()
for field in fields:
if field in encrypted and encrypted[field]:
encrypted[field] = self.encrypt(str(encrypted[field]))
return encrypted
def decrypt_dict(self, data: dict, fields: list) -> dict:
"""Decrypt specific fields in a dictionary"""
decrypted = data.copy()
for field in fields:
if field in decrypted and decrypted[field]:
try:
decrypted[field] = self.decrypt(decrypted[field])
except:
pass # Field might not be encrypted
return decrypted
def hash_sensitive_data(self, data: str) -> str:
"""One-way hash for sensitive data (like API keys for comparison)"""
import hashlib
return hashlib.sha256(data.encode()).hexdigest()
encryption_manager = EncryptionManager()
class SecureStorage:
"""Secure storage for sensitive configuration"""
def __init__(self):
self.encryption = encryption_manager
def store_hf_credentials(self, user_id: str, token: str, repo: str) -> dict:
"""Store HuggingFace credentials securely"""
return {
'user_id': user_id,
'hf_token': self.encryption.encrypt(token) if token else None,
'hf_repo': repo # Repo name is not sensitive
}
def retrieve_hf_credentials(self, stored_data: dict) -> dict:
"""Retrieve and decrypt HuggingFace credentials"""
return {
'user_id': stored_data.get('user_id'),
'hf_token': self.encryption.decrypt(stored_data['hf_token']) if stored_data.get('hf_token') else None,
'hf_repo': stored_data.get('hf_repo')
}
def mask_sensitive_data(self, data: str, visible_chars: int = 4) -> str:
"""Mask sensitive data for display (e.g., API keys)"""
if not data or len(data) <= visible_chars:
return '*' * len(data) if data else ''
return data[:visible_chars] + '*' * (len(data) - visible_chars)
secure_storage = SecureStorage()