AgentGraph / backend /services /encryption_service.py
wu981526092's picture
🚀 Deploy AgentGraph: Complete agent monitoring and knowledge graph system
c2ea5ed
"""
Encryption Service for Secure API Key Storage
Provides proper encryption/decryption for sensitive data like API keys and secrets.
Uses Fernet symmetric encryption with a derived key.
"""
import os
import base64
import logging
from typing import Optional
try:
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
CRYPTOGRAPHY_AVAILABLE = True
except ImportError:
CRYPTOGRAPHY_AVAILABLE = False
logger = logging.getLogger("agent_monitoring_server.encryption")
class EncryptionService:
"""Service for encrypting and decrypting sensitive data"""
def __init__(self):
self._fernet = None
self._initialize_encryption()
def _initialize_encryption(self):
"""Initialize the encryption key from environment or generate one"""
try:
# Try to get encryption key from environment
encryption_key = os.environ.get('AGENT_GRAPH_ENCRYPTION_KEY')
if not encryption_key:
# Generate a new key if none exists
logger.warning("No encryption key found in environment. Generating a new one.")
logger.warning("Set AGENT_GRAPH_ENCRYPTION_KEY environment variable for persistence.")
# Generate a random password and salt for key derivation
password = os.urandom(32)
salt = os.urandom(16)
# Derive key using PBKDF2
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=100000,
)
key = base64.urlsafe_b64encode(kdf.derive(password))
encryption_key = key.decode('utf-8')
# Store the key (in production, this should be handled more securely)
logger.info("Generated new encryption key. Consider setting AGENT_GRAPH_ENCRYPTION_KEY.")
# Create Fernet instance
self._fernet = Fernet(encryption_key.encode('utf-8'))
logger.info("Encryption service initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize encryption service: {str(e)}")
# Fallback to base64 if encryption fails
self._fernet = None
def encrypt(self, plaintext: str) -> str:
"""
Encrypt a plaintext string
Args:
plaintext: The string to encrypt
Returns:
Encrypted string (base64 encoded)
"""
if not plaintext:
return ""
try:
if self._fernet:
# Use proper encryption
encrypted_bytes = self._fernet.encrypt(plaintext.encode('utf-8'))
return base64.urlsafe_b64encode(encrypted_bytes).decode('utf-8')
else:
# Fallback to base64 encoding
logger.warning("Using base64 fallback - encryption not available")
return base64.b64encode(plaintext.encode('utf-8')).decode('utf-8')
except Exception as e:
logger.error(f"Encryption failed: {str(e)}")
# Fallback to base64
return base64.b64encode(plaintext.encode('utf-8')).decode('utf-8')
def decrypt(self, encrypted_text: str) -> str:
"""
Decrypt an encrypted string
Args:
encrypted_text: The encrypted string to decrypt
Returns:
Decrypted plaintext string
"""
if not encrypted_text:
return ""
try:
if self._fernet:
# Try proper decryption first
try:
encrypted_bytes = base64.urlsafe_b64decode(encrypted_text.encode('utf-8'))
decrypted_bytes = self._fernet.decrypt(encrypted_bytes)
return decrypted_bytes.decode('utf-8')
except Exception:
# If proper decryption fails, try base64 fallback
logger.warning("Proper decryption failed, trying base64 fallback")
return base64.b64decode(encrypted_text.encode('utf-8')).decode('utf-8')
else:
# Use base64 decoding
return base64.b64decode(encrypted_text.encode('utf-8')).decode('utf-8')
except Exception as e:
logger.error(f"Decryption failed: {str(e)}")
# Return the original text if all decryption methods fail
return encrypted_text
def is_encryption_available(self) -> bool:
"""Check if proper encryption is available"""
return self._fernet is not None
def generate_key(self) -> str:
"""Generate a new encryption key"""
return Fernet.generate_key().decode('utf-8')
# Global encryption service instance
encryption_service = EncryptionService()