File size: 5,199 Bytes
c2ea5ed
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
"""
Encryption Service for Secure API Key Storage

Provides proper encryption/decryption for sensitive data like API keys and secrets.
Uses Fernet symmetric encryption with a derived key.
"""

import os
import base64
import logging
from typing import Optional

try:
    from cryptography.fernet import Fernet
    from cryptography.hazmat.primitives import hashes
    from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
    CRYPTOGRAPHY_AVAILABLE = True
except ImportError:
    CRYPTOGRAPHY_AVAILABLE = False

logger = logging.getLogger("agent_monitoring_server.encryption")

class EncryptionService:
    """Service for encrypting and decrypting sensitive data"""
    
    def __init__(self):
        self._fernet = None
        self._initialize_encryption()
        
    def _initialize_encryption(self):
        """Initialize the encryption key from environment or generate one"""
        try:
            # Try to get encryption key from environment
            encryption_key = os.environ.get('AGENT_GRAPH_ENCRYPTION_KEY')
            
            if not encryption_key:
                # Generate a new key if none exists
                logger.warning("No encryption key found in environment. Generating a new one.")
                logger.warning("Set AGENT_GRAPH_ENCRYPTION_KEY environment variable for persistence.")
                
                # Generate a random password and salt for key derivation
                password = os.urandom(32)
                salt = os.urandom(16)
                
                # Derive key using PBKDF2
                kdf = PBKDF2HMAC(
                    algorithm=hashes.SHA256(),
                    length=32,
                    salt=salt,
                    iterations=100000,
                )
                key = base64.urlsafe_b64encode(kdf.derive(password))
                encryption_key = key.decode('utf-8')
                
                # Store the key (in production, this should be handled more securely)
                logger.info("Generated new encryption key. Consider setting AGENT_GRAPH_ENCRYPTION_KEY.")
            
            # Create Fernet instance
            self._fernet = Fernet(encryption_key.encode('utf-8'))
            logger.info("Encryption service initialized successfully")
            
        except Exception as e:
            logger.error(f"Failed to initialize encryption service: {str(e)}")
            # Fallback to base64 if encryption fails
            self._fernet = None
            
    def encrypt(self, plaintext: str) -> str:
        """
        Encrypt a plaintext string
        
        Args:
            plaintext: The string to encrypt
            
        Returns:
            Encrypted string (base64 encoded)
        """
        if not plaintext:
            return ""
            
        try:
            if self._fernet:
                # Use proper encryption
                encrypted_bytes = self._fernet.encrypt(plaintext.encode('utf-8'))
                return base64.urlsafe_b64encode(encrypted_bytes).decode('utf-8')
            else:
                # Fallback to base64 encoding
                logger.warning("Using base64 fallback - encryption not available")
                return base64.b64encode(plaintext.encode('utf-8')).decode('utf-8')
                
        except Exception as e:
            logger.error(f"Encryption failed: {str(e)}")
            # Fallback to base64
            return base64.b64encode(plaintext.encode('utf-8')).decode('utf-8')
            
    def decrypt(self, encrypted_text: str) -> str:
        """
        Decrypt an encrypted string
        
        Args:
            encrypted_text: The encrypted string to decrypt
            
        Returns:
            Decrypted plaintext string
        """
        if not encrypted_text:
            return ""
            
        try:
            if self._fernet:
                # Try proper decryption first
                try:
                    encrypted_bytes = base64.urlsafe_b64decode(encrypted_text.encode('utf-8'))
                    decrypted_bytes = self._fernet.decrypt(encrypted_bytes)
                    return decrypted_bytes.decode('utf-8')
                except Exception:
                    # If proper decryption fails, try base64 fallback
                    logger.warning("Proper decryption failed, trying base64 fallback")
                    return base64.b64decode(encrypted_text.encode('utf-8')).decode('utf-8')
            else:
                # Use base64 decoding
                return base64.b64decode(encrypted_text.encode('utf-8')).decode('utf-8')
                
        except Exception as e:
            logger.error(f"Decryption failed: {str(e)}")
            # Return the original text if all decryption methods fail
            return encrypted_text
            
    def is_encryption_available(self) -> bool:
        """Check if proper encryption is available"""
        return self._fernet is not None
        
    def generate_key(self) -> str:
        """Generate a new encryption key"""
        return Fernet.generate_key().decode('utf-8')

# Global encryption service instance
encryption_service = EncryptionService()