""" Encryption Service for Secure API Key Storage Provides proper encryption/decryption for sensitive data like API keys and secrets. Uses Fernet symmetric encryption with a derived key. """ import os import base64 import logging from typing import Optional try: from cryptography.fernet import Fernet from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC CRYPTOGRAPHY_AVAILABLE = True except ImportError: CRYPTOGRAPHY_AVAILABLE = False logger = logging.getLogger("agent_monitoring_server.encryption") class EncryptionService: """Service for encrypting and decrypting sensitive data""" def __init__(self): self._fernet = None self._initialize_encryption() def _initialize_encryption(self): """Initialize the encryption key from environment or generate one""" try: # Try to get encryption key from environment encryption_key = os.environ.get('AGENT_GRAPH_ENCRYPTION_KEY') if not encryption_key: # Generate a new key if none exists logger.warning("No encryption key found in environment. Generating a new one.") logger.warning("Set AGENT_GRAPH_ENCRYPTION_KEY environment variable for persistence.") # Generate a random password and salt for key derivation password = os.urandom(32) salt = os.urandom(16) # Derive key using PBKDF2 kdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32, salt=salt, iterations=100000, ) key = base64.urlsafe_b64encode(kdf.derive(password)) encryption_key = key.decode('utf-8') # Store the key (in production, this should be handled more securely) logger.info("Generated new encryption key. Consider setting AGENT_GRAPH_ENCRYPTION_KEY.") # Create Fernet instance self._fernet = Fernet(encryption_key.encode('utf-8')) logger.info("Encryption service initialized successfully") except Exception as e: logger.error(f"Failed to initialize encryption service: {str(e)}") # Fallback to base64 if encryption fails self._fernet = None def encrypt(self, plaintext: str) -> str: """ Encrypt a plaintext string Args: plaintext: The string to encrypt Returns: Encrypted string (base64 encoded) """ if not plaintext: return "" try: if self._fernet: # Use proper encryption encrypted_bytes = self._fernet.encrypt(plaintext.encode('utf-8')) return base64.urlsafe_b64encode(encrypted_bytes).decode('utf-8') else: # Fallback to base64 encoding logger.warning("Using base64 fallback - encryption not available") return base64.b64encode(plaintext.encode('utf-8')).decode('utf-8') except Exception as e: logger.error(f"Encryption failed: {str(e)}") # Fallback to base64 return base64.b64encode(plaintext.encode('utf-8')).decode('utf-8') def decrypt(self, encrypted_text: str) -> str: """ Decrypt an encrypted string Args: encrypted_text: The encrypted string to decrypt Returns: Decrypted plaintext string """ if not encrypted_text: return "" try: if self._fernet: # Try proper decryption first try: encrypted_bytes = base64.urlsafe_b64decode(encrypted_text.encode('utf-8')) decrypted_bytes = self._fernet.decrypt(encrypted_bytes) return decrypted_bytes.decode('utf-8') except Exception: # If proper decryption fails, try base64 fallback logger.warning("Proper decryption failed, trying base64 fallback") return base64.b64decode(encrypted_text.encode('utf-8')).decode('utf-8') else: # Use base64 decoding return base64.b64decode(encrypted_text.encode('utf-8')).decode('utf-8') except Exception as e: logger.error(f"Decryption failed: {str(e)}") # Return the original text if all decryption methods fail return encrypted_text def is_encryption_available(self) -> bool: """Check if proper encryption is available""" return self._fernet is not None def generate_key(self) -> str: """Generate a new encryption key""" return Fernet.generate_key().decode('utf-8') # Global encryption service instance encryption_service = EncryptionService()