rajkumarrawal's picture
Initial commit
2ec0d39
"""
Secrets Management for MCP Orchestration Platform
Enterprise-grade secrets handling with encryption and access control
"""
import os
import json
import base64
import asyncio
from typing import Dict, Any, Optional, Set, List
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import structlog
from datetime import datetime, timedelta
import vault_secrets
import boto3
from botocore.exceptions import ClientError
# =============================================================================
# Secrets Storage Interfaces
# =============================================================================
class SecretsStore(ABC):
"""Abstract interface for secrets storage backends."""
@abstractmethod
async def get_secret(self, key: str) -> Optional[str]:
"""Retrieve a secret by key."""
pass
@abstractmethod
async def set_secret(self, key: str, value: str, ttl: Optional[int] = None) -> bool:
"""Store a secret with optional TTL."""
pass
@abstractmethod
async def delete_secret(self, key: str) -> bool:
"""Delete a secret."""
pass
@abstractmethod
async def list_secrets(self, prefix: str = "") -> List[str]:
"""List all secrets with optional prefix filter."""
pass
# =============================================================================
# Local Encrypted File Storage
# =============================================================================
class LocalSecretsStore(SecretsStore):
"""Local file-based secrets storage with encryption."""
def __init__(self, secret_key: bytes, storage_path: str = "./secrets"):
self.secret_key = secret_key
self.storage_path = storage_path
self.logger = structlog.get_logger()
# Initialize Fernet cipher
self.cipher = Fernet(self.secret_key)
# Load existing secrets
self.secrets: Dict[str, Dict[str, Any]] = {}
# Create storage directory
os.makedirs(storage_path, exist_ok=True)
# Load secrets on init
self._load_secrets()
def _generate_key_from_passphrase(self, passphrase: str, salt: bytes = None) -> bytes:
"""Generate encryption key from passphrase."""
if salt is None:
salt = os.urandom(16)
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=100000,
)
key = base64.urlsafe_b64encode(kdf.derive(passphrase.encode()))
return key
def _encrypt_data(self, data: str) -> str:
"""Encrypt sensitive data."""
try:
return self.cipher.encrypt(data.encode()).decode()
except Exception as e:
self.logger.error("Encryption failed", error=str(e))
raise
def _decrypt_data(self, encrypted_data: str) -> str:
"""Decrypt sensitive data."""
try:
return self.cipher.decrypt(encrypted_data.encode()).decode()
except Exception as e:
self.logger.error("Decryption failed", error=str(e))
raise
def _load_secrets(self):
"""Load secrets from encrypted storage."""
secrets_file = os.path.join(self.storage_path, "secrets.enc")
if not os.path.exists(secrets_file):
return
try:
with open(secrets_file, 'r') as f:
encrypted_data = f.read()
decrypted_data = self._decrypt_data(encrypted_data)
self.secrets = json.loads(decrypted_data)
self.logger.info("Secrets loaded", count=len(self.secrets))
except Exception as e:
self.logger.error("Failed to load secrets", error=str(e))
self.secrets = {}
def _save_secrets(self):
"""Save secrets to encrypted storage."""
secrets_file = os.path.join(self.storage_path, "secrets.enc")
try:
# Remove expired secrets
current_time = datetime.utcnow()
expired_keys = []
for key, secret_info in self.secrets.items():
if secret_info.get('expires_at'):
expires_at = datetime.fromisoformat(secret_info['expires_at'])
if current_time > expires_at:
expired_keys.append(key)
for key in expired_keys:
del self.secrets[key]
self.logger.info("Removed expired secret", key=key)
# Encrypt and save
encrypted_data = self._encrypt_data(json.dumps(self.secrets))
with open(secrets_file, 'w') as f:
f.write(encrypted_data)
self.logger.info("Secrets saved", count=len(self.secrets))
except Exception as e:
self.logger.error("Failed to save secrets", error=str(e))
raise
async def get_secret(self, key: str) -> Optional[str]:
"""Retrieve a secret by key."""
# Check if secret exists and not expired
secret_info = self.secrets.get(key)
if not secret_info:
return None
# Check expiry
if secret_info.get('expires_at'):
expires_at = datetime.fromisoformat(secret_info['expires_at'])
if datetime.utcnow() > expires_at:
# Remove expired secret
del self.secrets[key]
self._save_secrets()
return None
return secret_info['value']
async def set_secret(self, key: str, value: str, ttl: Optional[int] = None) -> bool:
"""Store a secret with optional TTL."""
try:
secret_info = {
'value': value,
'created_at': datetime.utcnow().isoformat(),
'access_count': 0
}
if ttl:
expires_at = datetime.utcnow() + timedelta(seconds=ttl)
secret_info['expires_at'] = expires_at.isoformat()
self.secrets[key] = secret_info
self._save_secrets()
self.logger.info("Secret stored", key=key, ttl=ttl)
return True
except Exception as e:
self.logger.error("Failed to store secret", key=key, error=str(e))
return False
async def delete_secret(self, key: str) -> bool:
"""Delete a secret."""
try:
if key in self.secrets:
del self.secrets[key]
self._save_secrets()
self.logger.info("Secret deleted", key=key)
return True
return False
except Exception as e:
self.logger.error("Failed to delete secret", key=key, error=str(e))
return False
async def list_secrets(self, prefix: str = "") -> List[str]:
"""List all secrets with optional prefix filter."""
current_time = datetime.utcnow()
# Remove expired secrets and return filtered list
valid_secrets = []
expired_keys = []
for key in self.secrets:
# Check if matches prefix
if prefix and not key.startswith(prefix):
continue
# Check expiry
secret_info = self.secrets[key]
if secret_info.get('expires_at'):
expires_at = datetime.fromisoformat(secret_info['expires_at'])
if current_time > expires_at:
expired_keys.append(key)
continue
valid_secrets.append(key)
# Clean up expired secrets
for key in expired_keys:
del self.secrets[key]
if expired_keys:
self._save_secrets()
return valid_secrets
# =============================================================================
# HashiCorp Vault Integration
# =============================================================================
class VaultSecretsStore(SecretsStore):
"""HashiCorp Vault secrets storage."""
def __init__(self, vault_url: str, vault_token: str, mount_path: str = "secret"):
self.vault_url = vault_url
self.vault_token = vault_token
self.mount_path = mount_path
self.logger = structlog.get_logger()
# Initialize vault client
self.client = vault_secrets.Client(
url=vault_url,
token=vault_token
)
async def get_secret(self, key: str) -> Optional[str]:
"""Retrieve a secret from Vault."""
try:
secret_path = f"{self.mount_path}/data/{key}"
response = await self.client.read(secret_path)
if response and 'data' in response:
return response['data']['value']
return None
except vault_secrets.VaultNotFoundError:
return None
except Exception as e:
self.logger.error("Vault get secret failed", key=key, error=str(e))
return None
async def set_secret(self, key: str, value: str, ttl: Optional[int] = None) -> bool:
"""Store a secret in Vault."""
try:
secret_path = f"{self.mount_path}/data/{key}"
secret_data = {
'value': value,
'created_at': datetime.utcnow().isoformat()
}
if ttl:
# Convert TTL to Lease Duration for Vault
lease_duration = ttl
secret_data['lease_duration'] = lease_duration
response = await self.client.write(secret_path, **secret_data)
self.logger.info("Secret stored in Vault", key=key, ttl=ttl)
return True
except Exception as e:
self.logger.error("Vault set secret failed", key=key, error=str(e))
return False
async def delete_secret(self, key: str) -> bool:
"""Delete a secret from Vault."""
try:
secret_path = f"{self.mount_path}/data/{key}"
await self.client.delete(secret_path)
self.logger.info("Secret deleted from Vault", key=key)
return True
except vault_secrets.VaultNotFoundError:
return True # Already deleted
except Exception as e:
self.logger.error("Vault delete secret failed", key=key, error=str(e))
return False
async def list_secrets(self, prefix: str = "") -> List[str]:
"""List secrets in Vault."""
try:
list_path = f"{self.mount_path}/metadata"
response = await self.client.list(list_path)
if not response or 'data' not in response:
return []
secrets = []
for key_data in response['data']['keys']:
secret_key = key_data.rstrip('/')
if prefix and secret_key.startswith(prefix):
secrets.append(secret_key)
elif not prefix:
secrets.append(secret_key)
return secrets
except Exception as e:
self.logger.error("Vault list secrets failed", error=str(e))
return []
# =============================================================================
# AWS Secrets Manager Integration
# =============================================================================
class AWSSecretsStore(SecretsStore):
"""AWS Secrets Manager secrets storage."""
def __init__(self, region_name: str = "us-east-1", secret_prefix: str = "mcp-orchestrator/"):
self.region_name = region_name
self.secret_prefix = secret_prefix
self.logger = structlog.get_logger()
# Initialize AWS clients
self.secrets_manager = boto3.client('secretsmanager', region_name=region_name)
def _get_secret_name(self, key: str) -> str:
"""Get the full secret name for AWS."""
return f"{self.secret_prefix}{key}"
async def get_secret(self, key: str) -> Optional[str]:
"""Retrieve a secret from AWS Secrets Manager."""
try:
secret_name = self._get_secret_name(key)
response = self.secrets_manager.get_secret_value(SecretId=secret_name)
return response['SecretString']
except ClientError as e:
if e.response['Error']['Code'] == 'ResourceNotFoundException':
return None
self.logger.error("AWS get secret failed", key=key, error=str(e))
return None
except Exception as e:
self.logger.error("AWS get secret unexpected error", key=key, error=str(e))
return None
async def set_secret(self, key: str, value: str, ttl: Optional[int] = None) -> bool:
"""Store a secret in AWS Secrets Manager."""
try:
secret_name = self._get_secret_name(key)
# AWS Secrets Manager doesn't support TTL, so we store metadata
secret_value = {
'value': value,
'created_at': datetime.utcnow().isoformat(),
'ttl': ttl
}
self.secrets_manager.create_secret(
Name=secret_name,
SecretString=json.dumps(secret_value)
)
self.logger.info("Secret stored in AWS Secrets Manager", key=key, ttl=ttl)
return True
except ClientError as e:
if e.response['Error']['Code'] == 'ResourceAlreadyExistsException':
# Update existing secret
return await self._update_secret(key, value, ttl)
self.logger.error("AWS set secret failed", key=key, error=str(e))
return False
except Exception as e:
self.logger.error("AWS set secret unexpected error", key=key, error=str(e))
return False
async def _update_secret(self, key: str, value: str, ttl: Optional[int]) -> bool:
"""Update an existing secret in AWS Secrets Manager."""
try:
secret_name = self._get_secret_name(key)
secret_value = {
'value': value,
'created_at': datetime.utcnow().isoformat(),
'ttl': ttl
}
self.secrets_manager.update_secret(
SecretId=secret_name,
SecretString=json.dumps(secret_value)
)
self.logger.info("Secret updated in AWS Secrets Manager", key=key)
return True
except Exception as e:
self.logger.error("AWS update secret failed", key=key, error=str(e))
return False
async def delete_secret(self, key: str) -> bool:
"""Delete a secret from AWS Secrets Manager."""
try:
secret_name = self._get_secret_name(key)
self.secrets_manager.delete_secret(SecretId=secret_name)
self.logger.info("Secret deleted from AWS Secrets Manager", key=key)
return True
except ClientError as e:
if e.response['Error']['Code'] == 'ResourceNotFoundException':
return True # Already deleted
self.logger.error("AWS delete secret failed", key=key, error=str(e))
return False
except Exception as e:
self.logger.error("AWS delete secret unexpected error", key=key, error=str(e))
return False
async def list_secrets(self, prefix: str = "") -> List[str]:
"""List secrets in AWS Secrets Manager."""
try:
# AWS Secrets Manager doesn't support prefix filtering in list,
# so we get all and filter client-side
response = self.secrets_manager.list_secrets()
secrets = []
for secret in response['SecretList']:
secret_key = secret['Name']
# Remove prefix for user-facing key
if secret_key.startswith(self.secret_prefix):
user_key = secret_key[len(self.secret_prefix):]
if prefix and user_key.startswith(prefix):
secrets.append(user_key)
elif not prefix:
secrets.append(user_key)
return secrets
except Exception as e:
self.logger.error("AWS list secrets failed", error=str(e))
return []
# =============================================================================
# Environment Variables Store
# =============================================================================
class EnvironmentSecretsStore(SecretsStore):
"""Environment variable-based secrets storage (for development)."""
def __init__(self, env_prefix: str = "MCP_SECRET_"):
self.env_prefix = env_prefix
self.logger = structlog.get_logger()
async def get_secret(self, key: str) -> Optional[str]:
"""Retrieve a secret from environment variables."""
env_key = f"{self.env_prefix}{key.upper()}"
return os.getenv(env_key)
async def set_secret(self, key: str, value: str, ttl: Optional[int] = None) -> bool:
"""Store a secret in environment variables."""
# In production, this would set the environment variable
# For security, we don't actually set it here
self.logger.warning(
"Environment store is for development only",
key=key
)
return False
async def delete_secret(self, key: str) -> bool:
"""Delete a secret from environment variables."""
env_key = f"{self.env_prefix}{key.upper()}"
if env_key in os.environ:
del os.environ[env_key]
return True
return False
async def list_secrets(self, prefix: str = "") -> List[str]:
"""List secrets from environment variables."""
secrets = []
for env_key in os.environ:
if env_key.startswith(self.env_prefix):
key = env_key[len(self.env_prefix):].lower()
if prefix and key.startswith(prefix):
secrets.append(key)
elif not prefix:
secrets.append(key)
return secrets
# =============================================================================
# Secrets Manager Orchestrator
# =============================================================================
class SecretsManager:
"""Central secrets management orchestrator with multiple backends."""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.logger = structlog.get_logger()
# Initialize backends
self.backends: Dict[str, SecretsStore] = {}
# Determine primary backend
primary_backend = config.get('primary_backend', 'local')
# Initialize backends based on configuration
if config.get('local_enabled', True):
if 'local' not in self.config.get('local', {}):
# Generate a default encryption key
encryption_key = Fernet.generate_key()
self.config['local'] = {'encryption_key': base64.b64encode(encryption_key).decode()}
local_store = LocalSecretsStore(
base64.b64decode(self.config['local']['encryption_key']),
self.config['local'].get('storage_path', './secrets')
)
self.backends['local'] = local_store
# Initialize Vault backend if configured
vault_config = config.get('vault', {})
if vault_config.get('enabled') and all([
vault_config.get('url'),
vault_config.get('token')
]):
vault_store = VaultSecretsStore(
vault_url=vault_config['url'],
vault_token=vault_config['token'],
mount_path=vault_config.get('mount_path', 'secret')
)
self.backends['vault'] = vault_store
# Initialize AWS Secrets Manager if configured
aws_config = config.get('aws', {})
if aws_config.get('enabled'):
aws_store = AWSSecretsStore(
region_name=aws_config.get('region_name', 'us-east-1'),
secret_prefix=aws_config.get('secret_prefix', 'mcp-orchestrator/')
)
self.backends['aws'] = aws_store
# Initialize environment backend for development
if config.get('env_enabled', True):
env_store = EnvironmentSecretsStore(
env_prefix=config.get('env_prefix', 'MCP_SECRET_')
)
self.backends['env'] = env_store
# Set primary backend
self.primary_backend = self.backends.get(primary_backend)
if not self.primary_backend:
raise ValueError(f"Primary backend '{primary_backend}' not available")
self.logger.info(
"Secrets manager initialized",
backends=list(self.backends.keys()),
primary=primary_backend
)
async def get_secret(self, key: str, backend: Optional[str] = None) -> Optional[str]:
"""Get secret with optional backend selection."""
# Try specified backend first
if backend and backend in self.backends:
return await self.backends[backend].get_secret(key)
# Try primary backend
secret = await self.primary_backend.get_secret(key)
if secret:
return secret
# Fallback to other backends
for name, backend_store in self.backends.items():
if name != self.primary_backend.__class__.__name__.lower().replace('secretsstore', ''):
secret = await backend_store.get_secret(key)
if secret:
self.logger.info(
"Secret found in fallback backend",
key=key,
backend=name
)
return secret
return None
async def set_secret(self, key: str, value: str,
backend: Optional[str] = None,
ttl: Optional[int] = None) -> bool:
"""Set secret with optional backend selection."""
# Use specified backend or primary
if backend and backend in self.backends:
return await self.backends[backend].set_secret(key, value, ttl)
# Set in all backends for redundancy
success_count = 0
for name, backend_store in self.backends.items():
try:
if await backend_store.set_secret(key, value, ttl):
success_count += 1
except Exception as e:
self.logger.warning(
"Failed to set secret in backend",
key=key,
backend=name,
error=str(e)
)
return success_count > 0
async def delete_secret(self, key: str, backend: Optional[str] = None) -> bool:
"""Delete secret from specified backend or all."""
if backend and backend in self.backends:
return await self.backends[backend].delete_secret(key)
# Delete from all backends
success_count = 0
for name, backend_store in self.backends.items():
try:
if await backend_store.delete_secret(key):
success_count += 1
except Exception as e:
self.logger.warning(
"Failed to delete secret from backend",
key=key,
backend=name,
error=str(e)
)
return success_count > 0
async def list_secrets(self, backend: Optional[str] = None, prefix: str = "") -> List[str]:
"""List secrets from specified backend or all."""
if backend and backend in self.backends:
return await self.backends[backend].list_secrets(prefix)
# Collect from all backends
all_secrets = set()
for name, backend_store in self.backends.items():
try:
secrets = await backend_store.list_secrets(prefix)
all_secrets.update(secrets)
except Exception as e:
self.logger.warning(
"Failed to list secrets from backend",
backend=name,
error=str(e)
)
return sorted(list(all_secrets))
async def rotate_secrets(self, old_key: str, new_key: str,
old_backend: Optional[str] = None,
new_backend: Optional[str] = None) -> bool:
"""Rotate a secret from one key/backend to another."""
try:
# Get old secret
old_value = await self.get_secret(old_key, old_backend)
if not old_value:
self.logger.error("Old secret not found", key=old_key)
return False
# Set new secret
success = await self.set_secret(new_key, old_value, new_backend)
if success:
# Delete old secret
await self.delete_secret(old_key, old_backend)
self.logger.info(
"Secret rotated successfully",
old_key=old_key,
new_key=new_key
)
return True
return False
except Exception as e:
self.logger.error(
"Secret rotation failed",
old_key=old_key,
new_key=new_key,
error=str(e)
)
return False
def get_available_backends(self) -> List[str]:
"""Get list of available backends."""
return list(self.backends.keys())
async def health_check(self) -> Dict[str, bool]:
"""Perform health check on all backends."""
health_status = {}
for name, backend in self.backends.items():
try:
# Test with a temporary secret
test_key = f"health_check_{int(time.time())}"
test_value = "health_check_value"
set_success = await backend.set_secret(test_key, test_value)
get_success = await backend.get_secret(test_key) == test_value
delete_success = await backend.delete_secret(test_key)
health_status[name] = set_success and get_success and delete_success
except Exception as e:
self.logger.error(
"Backend health check failed",
backend=name,
error=str(e)
)
health_status[name] = False
return health_status
# =============================================================================
# Configuration Loader with Secrets Integration
# =============================================================================
class ConfigLoader:
"""Load configuration with integrated secrets management."""
def __init__(self, config_files: List[str] = None, secrets_config: Dict[str, Any] = None):
self.config_files = config_files or [
'config/default.yaml',
'config/secrets.yaml',
'config/production.yaml'
]
self.secrets_config = secrets_config or {}
self.logger = structlog.get_logger()
# Initialize secrets manager
self.secrets_manager = SecretsManager(self.secrets_config)
# Loaded configuration
self.config = {}
def _resolve_secrets_in_config(self, obj: Any) -> Any:
"""Recursively resolve secret references in configuration."""
if isinstance(obj, dict):
resolved = {}
for key, value in obj.items():
if isinstance(value, str) and value.startswith('secret://'):
# Extract secret reference
secret_path = value[9:] # Remove 'secret://' prefix
# Run synchronous resolution
secret_value = asyncio.run(
self.secrets_manager.get_secret(secret_path)
)
if secret_value is None:
self.logger.warning(
"Secret not found",
reference=secret_path
)
resolved[key] = None
else:
# Try to parse as JSON
try:
resolved[key] = json.loads(secret_value)
except (json.JSONDecodeError, ValueError):
resolved[key] = secret_value
else:
resolved[key] = self._resolve_secrets_in_config(value)
return resolved
elif isinstance(obj, list):
return [self._resolve_secrets_in_config(item) for item in obj]
else:
return obj
def load_config(self) -> Dict[str, Any]:
"""Load configuration and resolve secrets."""
try:
# Load configuration files
config = {}
for file_path in self.config_files:
if os.path.exists(file_path):
with open(file_path, 'r') as f:
if file_path.endswith('.yaml') or file_path.endswith('.yml'):
import yaml
file_config = yaml.safe_load(f)
elif file_path.endswith('.json'):
file_config = json.load(f)
else:
continue
# Merge configuration
if isinstance(file_config, dict):
config.update(file_config)
# Resolve secrets references
resolved_config = self._resolve_secrets_in_config(config)
# Store secrets manager in config for other components
resolved_config['secrets_manager'] = self.secrets_manager
self.config = resolved_config
self.logger.info(
"Configuration loaded with secrets resolved",
files_loaded=len(self.config_files)
)
return resolved_config
except Exception as e:
self.logger.error("Failed to load configuration", error=str(e))
raise
# =============================================================================
# Usage Examples
# =============================================================================
async def example_usage():
"""Example usage of the secrets management system."""
# Configuration
secrets_config = {
'primary_backend': 'local',
'local_enabled': True,
'local': {
'encryption_key': base64.b64encode(Fernet.generate_key()).decode(),
'storage_path': './secrets'
},
'vault_enabled': False, # Enable for production with Vault
'aws_enabled': False, # Enable for AWS deployments
'env_enabled': True # Always enabled for development
}
# Initialize loader
loader = ConfigLoader(
config_files=['config/default.yaml', 'config/secrets.yaml'],
secrets_config=secrets_config
)
# Load configuration with secrets
config = loader.load_config()
# Get secrets manager
secrets_manager = config['secrets_manager']
# Store a secret
await secrets_manager.set_secret(
key='database_password',
value='super_secret_password_123',
backend='local',
ttl=3600
)
# Retrieve the secret
password = await secrets_manager.get_secret('database_password')
print(f"Retrieved password: {password}")
# List all secrets
secrets = await secrets_manager.list_secrets()
print(f"Available secrets: {secrets}")
# Health check
health = await secrets_manager.health_check()
print(f"Backend health: {health}")
if __name__ == "__main__":
import time
asyncio.run(example_usage())