| import os | |
| import json | |
| import hashlib | |
| from typing import Dict, Any, Tuple | |
| from cryptography.fernet import Fernet | |
| from ..protocols import Logger | |
| class EncryptionService: | |
| def __init__(self, logger: Logger): | |
| self._logger = logger | |
| self._encryption_key = self._get_or_create_encryption_key() | |
| self._cipher = Fernet(self._encryption_key) | |
| def _get_or_create_encryption_key(self) -> bytes: | |
| key_env = os.getenv("MCP_CREDENTIAL_ENCRYPTION_KEY") | |
| try: | |
| if isinstance(key_env, str): | |
| return key_env.encode('utf-8') | |
| else: | |
| return key_env | |
| except Exception as e: | |
| self._logger.error(f"Invalid encryption key: {e}") | |
| self._logger.warning("Generating new encryption key for this session") | |
| key = Fernet.generate_key() | |
| self._logger.info(f"Generated new encryption key. Set this in your environment:") | |
| self._logger.info(f"MCP_CREDENTIAL_ENCRYPTION_KEY={key.decode()}") | |
| return key | |
| def encrypt_config(self, config: Dict[str, Any]) -> Tuple[bytes, str]: | |
| config_json = json.dumps(config, sort_keys=True) | |
| config_bytes = config_json.encode('utf-8') | |
| config_hash = hashlib.sha256(config_bytes).hexdigest() | |
| encrypted_config = self._cipher.encrypt(config_bytes) | |
| return encrypted_config, config_hash | |
| def decrypt_config(self, encrypted_config: bytes, expected_hash: str) -> Dict[str, Any]: | |
| try: | |
| decrypted_bytes = self._cipher.decrypt(encrypted_config) | |
| actual_hash = hashlib.sha256(decrypted_bytes).hexdigest() | |
| if actual_hash != expected_hash: | |
| raise ValueError("Credential integrity check failed") | |
| config_json = decrypted_bytes.decode('utf-8') | |
| return json.loads(config_json) | |
| except Exception as e: | |
| self._logger.error(f"Failed to decrypt credential: {e}") | |
| raise ValueError("Failed to decrypt credential") |