Spaces:
Running
Running
File size: 5,199 Bytes
c2ea5ed |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
"""
Encryption Service for Secure API Key Storage
Provides proper encryption/decryption for sensitive data like API keys and secrets.
Uses Fernet symmetric encryption with a derived key.
"""
import os
import base64
import logging
from typing import Optional
try:
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
CRYPTOGRAPHY_AVAILABLE = True
except ImportError:
CRYPTOGRAPHY_AVAILABLE = False
logger = logging.getLogger("agent_monitoring_server.encryption")
class EncryptionService:
"""Service for encrypting and decrypting sensitive data"""
def __init__(self):
self._fernet = None
self._initialize_encryption()
def _initialize_encryption(self):
"""Initialize the encryption key from environment or generate one"""
try:
# Try to get encryption key from environment
encryption_key = os.environ.get('AGENT_GRAPH_ENCRYPTION_KEY')
if not encryption_key:
# Generate a new key if none exists
logger.warning("No encryption key found in environment. Generating a new one.")
logger.warning("Set AGENT_GRAPH_ENCRYPTION_KEY environment variable for persistence.")
# Generate a random password and salt for key derivation
password = os.urandom(32)
salt = os.urandom(16)
# Derive key using PBKDF2
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=100000,
)
key = base64.urlsafe_b64encode(kdf.derive(password))
encryption_key = key.decode('utf-8')
# Store the key (in production, this should be handled more securely)
logger.info("Generated new encryption key. Consider setting AGENT_GRAPH_ENCRYPTION_KEY.")
# Create Fernet instance
self._fernet = Fernet(encryption_key.encode('utf-8'))
logger.info("Encryption service initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize encryption service: {str(e)}")
# Fallback to base64 if encryption fails
self._fernet = None
def encrypt(self, plaintext: str) -> str:
"""
Encrypt a plaintext string
Args:
plaintext: The string to encrypt
Returns:
Encrypted string (base64 encoded)
"""
if not plaintext:
return ""
try:
if self._fernet:
# Use proper encryption
encrypted_bytes = self._fernet.encrypt(plaintext.encode('utf-8'))
return base64.urlsafe_b64encode(encrypted_bytes).decode('utf-8')
else:
# Fallback to base64 encoding
logger.warning("Using base64 fallback - encryption not available")
return base64.b64encode(plaintext.encode('utf-8')).decode('utf-8')
except Exception as e:
logger.error(f"Encryption failed: {str(e)}")
# Fallback to base64
return base64.b64encode(plaintext.encode('utf-8')).decode('utf-8')
def decrypt(self, encrypted_text: str) -> str:
"""
Decrypt an encrypted string
Args:
encrypted_text: The encrypted string to decrypt
Returns:
Decrypted plaintext string
"""
if not encrypted_text:
return ""
try:
if self._fernet:
# Try proper decryption first
try:
encrypted_bytes = base64.urlsafe_b64decode(encrypted_text.encode('utf-8'))
decrypted_bytes = self._fernet.decrypt(encrypted_bytes)
return decrypted_bytes.decode('utf-8')
except Exception:
# If proper decryption fails, try base64 fallback
logger.warning("Proper decryption failed, trying base64 fallback")
return base64.b64decode(encrypted_text.encode('utf-8')).decode('utf-8')
else:
# Use base64 decoding
return base64.b64decode(encrypted_text.encode('utf-8')).decode('utf-8')
except Exception as e:
logger.error(f"Decryption failed: {str(e)}")
# Return the original text if all decryption methods fail
return encrypted_text
def is_encryption_available(self) -> bool:
"""Check if proper encryption is available"""
return self._fernet is not None
def generate_key(self) -> str:
"""Generate a new encryption key"""
return Fernet.generate_key().decode('utf-8')
# Global encryption service instance
encryption_service = EncryptionService() |