Spaces:
Running
Running
| """ | |
| Encryption Service for Secure API Key Storage | |
| Provides proper encryption/decryption for sensitive data like API keys and secrets. | |
| Uses Fernet symmetric encryption with a derived key. | |
| """ | |
| import os | |
| import base64 | |
| import logging | |
| from typing import Optional | |
| try: | |
| from cryptography.fernet import Fernet | |
| from cryptography.hazmat.primitives import hashes | |
| from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC | |
| CRYPTOGRAPHY_AVAILABLE = True | |
| except ImportError: | |
| CRYPTOGRAPHY_AVAILABLE = False | |
| logger = logging.getLogger("agent_monitoring_server.encryption") | |
| class EncryptionService: | |
| """Service for encrypting and decrypting sensitive data""" | |
| def __init__(self): | |
| self._fernet = None | |
| self._initialize_encryption() | |
| def _initialize_encryption(self): | |
| """Initialize the encryption key from environment or generate one""" | |
| try: | |
| # Try to get encryption key from environment | |
| encryption_key = os.environ.get('AGENT_GRAPH_ENCRYPTION_KEY') | |
| if not encryption_key: | |
| # Generate a new key if none exists | |
| logger.warning("No encryption key found in environment. Generating a new one.") | |
| logger.warning("Set AGENT_GRAPH_ENCRYPTION_KEY environment variable for persistence.") | |
| # Generate a random password and salt for key derivation | |
| password = os.urandom(32) | |
| salt = os.urandom(16) | |
| # Derive key using PBKDF2 | |
| kdf = PBKDF2HMAC( | |
| algorithm=hashes.SHA256(), | |
| length=32, | |
| salt=salt, | |
| iterations=100000, | |
| ) | |
| key = base64.urlsafe_b64encode(kdf.derive(password)) | |
| encryption_key = key.decode('utf-8') | |
| # Store the key (in production, this should be handled more securely) | |
| logger.info("Generated new encryption key. Consider setting AGENT_GRAPH_ENCRYPTION_KEY.") | |
| # Create Fernet instance | |
| self._fernet = Fernet(encryption_key.encode('utf-8')) | |
| logger.info("Encryption service initialized successfully") | |
| except Exception as e: | |
| logger.error(f"Failed to initialize encryption service: {str(e)}") | |
| # Fallback to base64 if encryption fails | |
| self._fernet = None | |
| def encrypt(self, plaintext: str) -> str: | |
| """ | |
| Encrypt a plaintext string | |
| Args: | |
| plaintext: The string to encrypt | |
| Returns: | |
| Encrypted string (base64 encoded) | |
| """ | |
| if not plaintext: | |
| return "" | |
| try: | |
| if self._fernet: | |
| # Use proper encryption | |
| encrypted_bytes = self._fernet.encrypt(plaintext.encode('utf-8')) | |
| return base64.urlsafe_b64encode(encrypted_bytes).decode('utf-8') | |
| else: | |
| # Fallback to base64 encoding | |
| logger.warning("Using base64 fallback - encryption not available") | |
| return base64.b64encode(plaintext.encode('utf-8')).decode('utf-8') | |
| except Exception as e: | |
| logger.error(f"Encryption failed: {str(e)}") | |
| # Fallback to base64 | |
| return base64.b64encode(plaintext.encode('utf-8')).decode('utf-8') | |
| def decrypt(self, encrypted_text: str) -> str: | |
| """ | |
| Decrypt an encrypted string | |
| Args: | |
| encrypted_text: The encrypted string to decrypt | |
| Returns: | |
| Decrypted plaintext string | |
| """ | |
| if not encrypted_text: | |
| return "" | |
| try: | |
| if self._fernet: | |
| # Try proper decryption first | |
| try: | |
| encrypted_bytes = base64.urlsafe_b64decode(encrypted_text.encode('utf-8')) | |
| decrypted_bytes = self._fernet.decrypt(encrypted_bytes) | |
| return decrypted_bytes.decode('utf-8') | |
| except Exception: | |
| # If proper decryption fails, try base64 fallback | |
| logger.warning("Proper decryption failed, trying base64 fallback") | |
| return base64.b64decode(encrypted_text.encode('utf-8')).decode('utf-8') | |
| else: | |
| # Use base64 decoding | |
| return base64.b64decode(encrypted_text.encode('utf-8')).decode('utf-8') | |
| except Exception as e: | |
| logger.error(f"Decryption failed: {str(e)}") | |
| # Return the original text if all decryption methods fail | |
| return encrypted_text | |
| def is_encryption_available(self) -> bool: | |
| """Check if proper encryption is available""" | |
| return self._fernet is not None | |
| def generate_key(self) -> str: | |
| """Generate a new encryption key""" | |
| return Fernet.generate_key().decode('utf-8') | |
| # Global encryption service instance | |
| encryption_service = EncryptionService() |