""" Secrets Management for MCP Orchestration Platform Enterprise-grade secrets handling with encryption and access control """ import os import json import base64 import asyncio from typing import Dict, Any, Optional, Set, List from cryptography.fernet import Fernet from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC import structlog from datetime import datetime, timedelta import vault_secrets import boto3 from botocore.exceptions import ClientError # ============================================================================= # Secrets Storage Interfaces # ============================================================================= class SecretsStore(ABC): """Abstract interface for secrets storage backends.""" @abstractmethod async def get_secret(self, key: str) -> Optional[str]: """Retrieve a secret by key.""" pass @abstractmethod async def set_secret(self, key: str, value: str, ttl: Optional[int] = None) -> bool: """Store a secret with optional TTL.""" pass @abstractmethod async def delete_secret(self, key: str) -> bool: """Delete a secret.""" pass @abstractmethod async def list_secrets(self, prefix: str = "") -> List[str]: """List all secrets with optional prefix filter.""" pass # ============================================================================= # Local Encrypted File Storage # ============================================================================= class LocalSecretsStore(SecretsStore): """Local file-based secrets storage with encryption.""" def __init__(self, secret_key: bytes, storage_path: str = "./secrets"): self.secret_key = secret_key self.storage_path = storage_path self.logger = structlog.get_logger() # Initialize Fernet cipher self.cipher = Fernet(self.secret_key) # Load existing secrets self.secrets: Dict[str, Dict[str, Any]] = {} # Create storage directory os.makedirs(storage_path, exist_ok=True) # Load secrets on init self._load_secrets() def _generate_key_from_passphrase(self, passphrase: str, salt: bytes = None) -> bytes: """Generate encryption key from passphrase.""" if salt is None: salt = os.urandom(16) kdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32, salt=salt, iterations=100000, ) key = base64.urlsafe_b64encode(kdf.derive(passphrase.encode())) return key def _encrypt_data(self, data: str) -> str: """Encrypt sensitive data.""" try: return self.cipher.encrypt(data.encode()).decode() except Exception as e: self.logger.error("Encryption failed", error=str(e)) raise def _decrypt_data(self, encrypted_data: str) -> str: """Decrypt sensitive data.""" try: return self.cipher.decrypt(encrypted_data.encode()).decode() except Exception as e: self.logger.error("Decryption failed", error=str(e)) raise def _load_secrets(self): """Load secrets from encrypted storage.""" secrets_file = os.path.join(self.storage_path, "secrets.enc") if not os.path.exists(secrets_file): return try: with open(secrets_file, 'r') as f: encrypted_data = f.read() decrypted_data = self._decrypt_data(encrypted_data) self.secrets = json.loads(decrypted_data) self.logger.info("Secrets loaded", count=len(self.secrets)) except Exception as e: self.logger.error("Failed to load secrets", error=str(e)) self.secrets = {} def _save_secrets(self): """Save secrets to encrypted storage.""" secrets_file = os.path.join(self.storage_path, "secrets.enc") try: # Remove expired secrets current_time = datetime.utcnow() expired_keys = [] for key, secret_info in self.secrets.items(): if secret_info.get('expires_at'): expires_at = datetime.fromisoformat(secret_info['expires_at']) if current_time > expires_at: expired_keys.append(key) for key in expired_keys: del self.secrets[key] self.logger.info("Removed expired secret", key=key) # Encrypt and save encrypted_data = self._encrypt_data(json.dumps(self.secrets)) with open(secrets_file, 'w') as f: f.write(encrypted_data) self.logger.info("Secrets saved", count=len(self.secrets)) except Exception as e: self.logger.error("Failed to save secrets", error=str(e)) raise async def get_secret(self, key: str) -> Optional[str]: """Retrieve a secret by key.""" # Check if secret exists and not expired secret_info = self.secrets.get(key) if not secret_info: return None # Check expiry if secret_info.get('expires_at'): expires_at = datetime.fromisoformat(secret_info['expires_at']) if datetime.utcnow() > expires_at: # Remove expired secret del self.secrets[key] self._save_secrets() return None return secret_info['value'] async def set_secret(self, key: str, value: str, ttl: Optional[int] = None) -> bool: """Store a secret with optional TTL.""" try: secret_info = { 'value': value, 'created_at': datetime.utcnow().isoformat(), 'access_count': 0 } if ttl: expires_at = datetime.utcnow() + timedelta(seconds=ttl) secret_info['expires_at'] = expires_at.isoformat() self.secrets[key] = secret_info self._save_secrets() self.logger.info("Secret stored", key=key, ttl=ttl) return True except Exception as e: self.logger.error("Failed to store secret", key=key, error=str(e)) return False async def delete_secret(self, key: str) -> bool: """Delete a secret.""" try: if key in self.secrets: del self.secrets[key] self._save_secrets() self.logger.info("Secret deleted", key=key) return True return False except Exception as e: self.logger.error("Failed to delete secret", key=key, error=str(e)) return False async def list_secrets(self, prefix: str = "") -> List[str]: """List all secrets with optional prefix filter.""" current_time = datetime.utcnow() # Remove expired secrets and return filtered list valid_secrets = [] expired_keys = [] for key in self.secrets: # Check if matches prefix if prefix and not key.startswith(prefix): continue # Check expiry secret_info = self.secrets[key] if secret_info.get('expires_at'): expires_at = datetime.fromisoformat(secret_info['expires_at']) if current_time > expires_at: expired_keys.append(key) continue valid_secrets.append(key) # Clean up expired secrets for key in expired_keys: del self.secrets[key] if expired_keys: self._save_secrets() return valid_secrets # ============================================================================= # HashiCorp Vault Integration # ============================================================================= class VaultSecretsStore(SecretsStore): """HashiCorp Vault secrets storage.""" def __init__(self, vault_url: str, vault_token: str, mount_path: str = "secret"): self.vault_url = vault_url self.vault_token = vault_token self.mount_path = mount_path self.logger = structlog.get_logger() # Initialize vault client self.client = vault_secrets.Client( url=vault_url, token=vault_token ) async def get_secret(self, key: str) -> Optional[str]: """Retrieve a secret from Vault.""" try: secret_path = f"{self.mount_path}/data/{key}" response = await self.client.read(secret_path) if response and 'data' in response: return response['data']['value'] return None except vault_secrets.VaultNotFoundError: return None except Exception as e: self.logger.error("Vault get secret failed", key=key, error=str(e)) return None async def set_secret(self, key: str, value: str, ttl: Optional[int] = None) -> bool: """Store a secret in Vault.""" try: secret_path = f"{self.mount_path}/data/{key}" secret_data = { 'value': value, 'created_at': datetime.utcnow().isoformat() } if ttl: # Convert TTL to Lease Duration for Vault lease_duration = ttl secret_data['lease_duration'] = lease_duration response = await self.client.write(secret_path, **secret_data) self.logger.info("Secret stored in Vault", key=key, ttl=ttl) return True except Exception as e: self.logger.error("Vault set secret failed", key=key, error=str(e)) return False async def delete_secret(self, key: str) -> bool: """Delete a secret from Vault.""" try: secret_path = f"{self.mount_path}/data/{key}" await self.client.delete(secret_path) self.logger.info("Secret deleted from Vault", key=key) return True except vault_secrets.VaultNotFoundError: return True # Already deleted except Exception as e: self.logger.error("Vault delete secret failed", key=key, error=str(e)) return False async def list_secrets(self, prefix: str = "") -> List[str]: """List secrets in Vault.""" try: list_path = f"{self.mount_path}/metadata" response = await self.client.list(list_path) if not response or 'data' not in response: return [] secrets = [] for key_data in response['data']['keys']: secret_key = key_data.rstrip('/') if prefix and secret_key.startswith(prefix): secrets.append(secret_key) elif not prefix: secrets.append(secret_key) return secrets except Exception as e: self.logger.error("Vault list secrets failed", error=str(e)) return [] # ============================================================================= # AWS Secrets Manager Integration # ============================================================================= class AWSSecretsStore(SecretsStore): """AWS Secrets Manager secrets storage.""" def __init__(self, region_name: str = "us-east-1", secret_prefix: str = "mcp-orchestrator/"): self.region_name = region_name self.secret_prefix = secret_prefix self.logger = structlog.get_logger() # Initialize AWS clients self.secrets_manager = boto3.client('secretsmanager', region_name=region_name) def _get_secret_name(self, key: str) -> str: """Get the full secret name for AWS.""" return f"{self.secret_prefix}{key}" async def get_secret(self, key: str) -> Optional[str]: """Retrieve a secret from AWS Secrets Manager.""" try: secret_name = self._get_secret_name(key) response = self.secrets_manager.get_secret_value(SecretId=secret_name) return response['SecretString'] except ClientError as e: if e.response['Error']['Code'] == 'ResourceNotFoundException': return None self.logger.error("AWS get secret failed", key=key, error=str(e)) return None except Exception as e: self.logger.error("AWS get secret unexpected error", key=key, error=str(e)) return None async def set_secret(self, key: str, value: str, ttl: Optional[int] = None) -> bool: """Store a secret in AWS Secrets Manager.""" try: secret_name = self._get_secret_name(key) # AWS Secrets Manager doesn't support TTL, so we store metadata secret_value = { 'value': value, 'created_at': datetime.utcnow().isoformat(), 'ttl': ttl } self.secrets_manager.create_secret( Name=secret_name, SecretString=json.dumps(secret_value) ) self.logger.info("Secret stored in AWS Secrets Manager", key=key, ttl=ttl) return True except ClientError as e: if e.response['Error']['Code'] == 'ResourceAlreadyExistsException': # Update existing secret return await self._update_secret(key, value, ttl) self.logger.error("AWS set secret failed", key=key, error=str(e)) return False except Exception as e: self.logger.error("AWS set secret unexpected error", key=key, error=str(e)) return False async def _update_secret(self, key: str, value: str, ttl: Optional[int]) -> bool: """Update an existing secret in AWS Secrets Manager.""" try: secret_name = self._get_secret_name(key) secret_value = { 'value': value, 'created_at': datetime.utcnow().isoformat(), 'ttl': ttl } self.secrets_manager.update_secret( SecretId=secret_name, SecretString=json.dumps(secret_value) ) self.logger.info("Secret updated in AWS Secrets Manager", key=key) return True except Exception as e: self.logger.error("AWS update secret failed", key=key, error=str(e)) return False async def delete_secret(self, key: str) -> bool: """Delete a secret from AWS Secrets Manager.""" try: secret_name = self._get_secret_name(key) self.secrets_manager.delete_secret(SecretId=secret_name) self.logger.info("Secret deleted from AWS Secrets Manager", key=key) return True except ClientError as e: if e.response['Error']['Code'] == 'ResourceNotFoundException': return True # Already deleted self.logger.error("AWS delete secret failed", key=key, error=str(e)) return False except Exception as e: self.logger.error("AWS delete secret unexpected error", key=key, error=str(e)) return False async def list_secrets(self, prefix: str = "") -> List[str]: """List secrets in AWS Secrets Manager.""" try: # AWS Secrets Manager doesn't support prefix filtering in list, # so we get all and filter client-side response = self.secrets_manager.list_secrets() secrets = [] for secret in response['SecretList']: secret_key = secret['Name'] # Remove prefix for user-facing key if secret_key.startswith(self.secret_prefix): user_key = secret_key[len(self.secret_prefix):] if prefix and user_key.startswith(prefix): secrets.append(user_key) elif not prefix: secrets.append(user_key) return secrets except Exception as e: self.logger.error("AWS list secrets failed", error=str(e)) return [] # ============================================================================= # Environment Variables Store # ============================================================================= class EnvironmentSecretsStore(SecretsStore): """Environment variable-based secrets storage (for development).""" def __init__(self, env_prefix: str = "MCP_SECRET_"): self.env_prefix = env_prefix self.logger = structlog.get_logger() async def get_secret(self, key: str) -> Optional[str]: """Retrieve a secret from environment variables.""" env_key = f"{self.env_prefix}{key.upper()}" return os.getenv(env_key) async def set_secret(self, key: str, value: str, ttl: Optional[int] = None) -> bool: """Store a secret in environment variables.""" # In production, this would set the environment variable # For security, we don't actually set it here self.logger.warning( "Environment store is for development only", key=key ) return False async def delete_secret(self, key: str) -> bool: """Delete a secret from environment variables.""" env_key = f"{self.env_prefix}{key.upper()}" if env_key in os.environ: del os.environ[env_key] return True return False async def list_secrets(self, prefix: str = "") -> List[str]: """List secrets from environment variables.""" secrets = [] for env_key in os.environ: if env_key.startswith(self.env_prefix): key = env_key[len(self.env_prefix):].lower() if prefix and key.startswith(prefix): secrets.append(key) elif not prefix: secrets.append(key) return secrets # ============================================================================= # Secrets Manager Orchestrator # ============================================================================= class SecretsManager: """Central secrets management orchestrator with multiple backends.""" def __init__(self, config: Dict[str, Any]): self.config = config self.logger = structlog.get_logger() # Initialize backends self.backends: Dict[str, SecretsStore] = {} # Determine primary backend primary_backend = config.get('primary_backend', 'local') # Initialize backends based on configuration if config.get('local_enabled', True): if 'local' not in self.config.get('local', {}): # Generate a default encryption key encryption_key = Fernet.generate_key() self.config['local'] = {'encryption_key': base64.b64encode(encryption_key).decode()} local_store = LocalSecretsStore( base64.b64decode(self.config['local']['encryption_key']), self.config['local'].get('storage_path', './secrets') ) self.backends['local'] = local_store # Initialize Vault backend if configured vault_config = config.get('vault', {}) if vault_config.get('enabled') and all([ vault_config.get('url'), vault_config.get('token') ]): vault_store = VaultSecretsStore( vault_url=vault_config['url'], vault_token=vault_config['token'], mount_path=vault_config.get('mount_path', 'secret') ) self.backends['vault'] = vault_store # Initialize AWS Secrets Manager if configured aws_config = config.get('aws', {}) if aws_config.get('enabled'): aws_store = AWSSecretsStore( region_name=aws_config.get('region_name', 'us-east-1'), secret_prefix=aws_config.get('secret_prefix', 'mcp-orchestrator/') ) self.backends['aws'] = aws_store # Initialize environment backend for development if config.get('env_enabled', True): env_store = EnvironmentSecretsStore( env_prefix=config.get('env_prefix', 'MCP_SECRET_') ) self.backends['env'] = env_store # Set primary backend self.primary_backend = self.backends.get(primary_backend) if not self.primary_backend: raise ValueError(f"Primary backend '{primary_backend}' not available") self.logger.info( "Secrets manager initialized", backends=list(self.backends.keys()), primary=primary_backend ) async def get_secret(self, key: str, backend: Optional[str] = None) -> Optional[str]: """Get secret with optional backend selection.""" # Try specified backend first if backend and backend in self.backends: return await self.backends[backend].get_secret(key) # Try primary backend secret = await self.primary_backend.get_secret(key) if secret: return secret # Fallback to other backends for name, backend_store in self.backends.items(): if name != self.primary_backend.__class__.__name__.lower().replace('secretsstore', ''): secret = await backend_store.get_secret(key) if secret: self.logger.info( "Secret found in fallback backend", key=key, backend=name ) return secret return None async def set_secret(self, key: str, value: str, backend: Optional[str] = None, ttl: Optional[int] = None) -> bool: """Set secret with optional backend selection.""" # Use specified backend or primary if backend and backend in self.backends: return await self.backends[backend].set_secret(key, value, ttl) # Set in all backends for redundancy success_count = 0 for name, backend_store in self.backends.items(): try: if await backend_store.set_secret(key, value, ttl): success_count += 1 except Exception as e: self.logger.warning( "Failed to set secret in backend", key=key, backend=name, error=str(e) ) return success_count > 0 async def delete_secret(self, key: str, backend: Optional[str] = None) -> bool: """Delete secret from specified backend or all.""" if backend and backend in self.backends: return await self.backends[backend].delete_secret(key) # Delete from all backends success_count = 0 for name, backend_store in self.backends.items(): try: if await backend_store.delete_secret(key): success_count += 1 except Exception as e: self.logger.warning( "Failed to delete secret from backend", key=key, backend=name, error=str(e) ) return success_count > 0 async def list_secrets(self, backend: Optional[str] = None, prefix: str = "") -> List[str]: """List secrets from specified backend or all.""" if backend and backend in self.backends: return await self.backends[backend].list_secrets(prefix) # Collect from all backends all_secrets = set() for name, backend_store in self.backends.items(): try: secrets = await backend_store.list_secrets(prefix) all_secrets.update(secrets) except Exception as e: self.logger.warning( "Failed to list secrets from backend", backend=name, error=str(e) ) return sorted(list(all_secrets)) async def rotate_secrets(self, old_key: str, new_key: str, old_backend: Optional[str] = None, new_backend: Optional[str] = None) -> bool: """Rotate a secret from one key/backend to another.""" try: # Get old secret old_value = await self.get_secret(old_key, old_backend) if not old_value: self.logger.error("Old secret not found", key=old_key) return False # Set new secret success = await self.set_secret(new_key, old_value, new_backend) if success: # Delete old secret await self.delete_secret(old_key, old_backend) self.logger.info( "Secret rotated successfully", old_key=old_key, new_key=new_key ) return True return False except Exception as e: self.logger.error( "Secret rotation failed", old_key=old_key, new_key=new_key, error=str(e) ) return False def get_available_backends(self) -> List[str]: """Get list of available backends.""" return list(self.backends.keys()) async def health_check(self) -> Dict[str, bool]: """Perform health check on all backends.""" health_status = {} for name, backend in self.backends.items(): try: # Test with a temporary secret test_key = f"health_check_{int(time.time())}" test_value = "health_check_value" set_success = await backend.set_secret(test_key, test_value) get_success = await backend.get_secret(test_key) == test_value delete_success = await backend.delete_secret(test_key) health_status[name] = set_success and get_success and delete_success except Exception as e: self.logger.error( "Backend health check failed", backend=name, error=str(e) ) health_status[name] = False return health_status # ============================================================================= # Configuration Loader with Secrets Integration # ============================================================================= class ConfigLoader: """Load configuration with integrated secrets management.""" def __init__(self, config_files: List[str] = None, secrets_config: Dict[str, Any] = None): self.config_files = config_files or [ 'config/default.yaml', 'config/secrets.yaml', 'config/production.yaml' ] self.secrets_config = secrets_config or {} self.logger = structlog.get_logger() # Initialize secrets manager self.secrets_manager = SecretsManager(self.secrets_config) # Loaded configuration self.config = {} def _resolve_secrets_in_config(self, obj: Any) -> Any: """Recursively resolve secret references in configuration.""" if isinstance(obj, dict): resolved = {} for key, value in obj.items(): if isinstance(value, str) and value.startswith('secret://'): # Extract secret reference secret_path = value[9:] # Remove 'secret://' prefix # Run synchronous resolution secret_value = asyncio.run( self.secrets_manager.get_secret(secret_path) ) if secret_value is None: self.logger.warning( "Secret not found", reference=secret_path ) resolved[key] = None else: # Try to parse as JSON try: resolved[key] = json.loads(secret_value) except (json.JSONDecodeError, ValueError): resolved[key] = secret_value else: resolved[key] = self._resolve_secrets_in_config(value) return resolved elif isinstance(obj, list): return [self._resolve_secrets_in_config(item) for item in obj] else: return obj def load_config(self) -> Dict[str, Any]: """Load configuration and resolve secrets.""" try: # Load configuration files config = {} for file_path in self.config_files: if os.path.exists(file_path): with open(file_path, 'r') as f: if file_path.endswith('.yaml') or file_path.endswith('.yml'): import yaml file_config = yaml.safe_load(f) elif file_path.endswith('.json'): file_config = json.load(f) else: continue # Merge configuration if isinstance(file_config, dict): config.update(file_config) # Resolve secrets references resolved_config = self._resolve_secrets_in_config(config) # Store secrets manager in config for other components resolved_config['secrets_manager'] = self.secrets_manager self.config = resolved_config self.logger.info( "Configuration loaded with secrets resolved", files_loaded=len(self.config_files) ) return resolved_config except Exception as e: self.logger.error("Failed to load configuration", error=str(e)) raise # ============================================================================= # Usage Examples # ============================================================================= async def example_usage(): """Example usage of the secrets management system.""" # Configuration secrets_config = { 'primary_backend': 'local', 'local_enabled': True, 'local': { 'encryption_key': base64.b64encode(Fernet.generate_key()).decode(), 'storage_path': './secrets' }, 'vault_enabled': False, # Enable for production with Vault 'aws_enabled': False, # Enable for AWS deployments 'env_enabled': True # Always enabled for development } # Initialize loader loader = ConfigLoader( config_files=['config/default.yaml', 'config/secrets.yaml'], secrets_config=secrets_config ) # Load configuration with secrets config = loader.load_config() # Get secrets manager secrets_manager = config['secrets_manager'] # Store a secret await secrets_manager.set_secret( key='database_password', value='super_secret_password_123', backend='local', ttl=3600 ) # Retrieve the secret password = await secrets_manager.get_secret('database_password') print(f"Retrieved password: {password}") # List all secrets secrets = await secrets_manager.list_secrets() print(f"Available secrets: {secrets}") # Health check health = await secrets_manager.health_check() print(f"Backend health: {health}") if __name__ == "__main__": import time asyncio.run(example_usage())